This is an automated email from the ASF dual-hosted git repository. binlijin pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 751cc62 HBASE-22280 Separate read/write handler for priority request(especial… (#202) 751cc62 is described below commit 751cc626113b59c082869d7c7f29a5a575a11ef6 Author: binlijin <binli...@gmail.com> AuthorDate: Mon Dec 9 16:11:35 2019 +0800 HBASE-22280 Separate read/write handler for priority request(especial… (#202) Signed-off-by: Yu Li <l...@apache.org> --- .../hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java | 52 ++++++++++++++++ .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 14 ++++- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 20 +++++-- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 69 ++++++++++++++++++++++ .../assignment/TestReportOnlineRegionsRace.java | 2 + 5 files changed, 150 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java new file mode 100644 index 0000000..c9e4270 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * RPC Executor that uses different queues for reads and writes for meta. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor { + public static final String META_CALL_QUEUE_READ_SHARE_CONF_KEY = + "hbase.ipc.server.metacallqueue.read.ratio"; + public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY = + "hbase.ipc.server.metacallqueue.scan.ratio"; + public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f; + + public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + final PriorityFunction priority, final Configuration conf, final Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + } + + @Override + protected float getReadShare(final Configuration conf) { + return conf.getFloat(META_CALL_QUEUE_READ_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_READ_SHARE); + } + + @Override + protected float getScanShare(final Configuration conf) { + return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 3ce5f0e..5e7e2f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -71,8 +71,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { final PriorityFunction priority, final Configuration conf, final Abortable abortable) { super(name, handlerCount, maxQueueLength, priority, conf, abortable); - float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); - float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + float callqReadShare = getReadShare(conf); + float callqScanShare = getScanShare(conf); numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare); writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare)); @@ -195,7 +195,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return activeScanHandlerCount.get(); } - private boolean isWriteRequest(final RequestHeader header, final Message param) { + protected boolean isWriteRequest(final RequestHeader header, final Message param) { // TODO: Is there a better way to do this? if (param instanceof MultiRequest) { MultiRequest multi = (MultiRequest)param; @@ -232,6 +232,14 @@ public class RWQueueRpcExecutor extends RpcExecutor { return param instanceof ScanRequest; } + protected float getReadShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + } + + protected float getScanShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + } + /* * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index fb52116..0c3e6c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -97,10 +97,22 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs } } - // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( - "priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - maxPriorityQueueLength, priority, conf, abortable) : null; + float metaCallqReadShare = + conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, + MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); + if (metaCallqReadShare > 0) { + // different read/write handler for meta, at least 1 read handler and 1 write handler + this.priorityExecutor = + new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount), + maxPriorityQueueLength, priority, conf, server); + } else { + // Create 2 queues to help priorityExecutor be more scalable. + this.priorityExecutor = priorityHandlerCount > 0 ? + new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, + abortable) : + null; + } this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( "replication.FPBQ", replicationHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxQueueLength, priority, conf, abortable) : null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 6b6a995..52b3216 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -559,6 +559,75 @@ public class TestSimpleRpcScheduler { } } + @Test + public void testMetaRWScanQueues() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); + schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); + schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); + schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS); + + RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner putCallTask = mock(CallRunner.class); + ServerCall putCall = mock(ServerCall.class); + putCall.param = RequestConverter.buildMutateRequest( + Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); + RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); + when(putCallTask.getRpcCall()).thenReturn(putCall); + when(putCall.getHeader()).thenReturn(putHead); + when(putCall.getParam()).thenReturn(putCall.param); + + CallRunner getCallTask = mock(CallRunner.class); + ServerCall getCall = mock(ServerCall.class); + RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); + when(getCallTask.getRpcCall()).thenReturn(getCall); + when(getCall.getHeader()).thenReturn(getHead); + + CallRunner scanCallTask = mock(CallRunner.class); + ServerCall scanCall = mock(ServerCall.class); + scanCall.param = ScanRequest.newBuilder().build(); + RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); + when(scanCallTask.getRpcCall()).thenReturn(scanCall); + when(scanCall.getHeader()).thenReturn(scanHead); + when(scanCall.getParam()).thenReturn(scanCall.param); + + ArrayList<Integer> work = new ArrayList<>(); + doAnswerTaskExecution(putCallTask, work, 1, 1000); + doAnswerTaskExecution(getCallTask, work, 2, 1000); + doAnswerTaskExecution(scanCallTask, work, 3, 1000); + + // There are 3 queues: [puts], [gets], [scans] + // so the calls will be interleaved + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + + while (work.size() < 6) { + Thread.sleep(100); + } + + for (int i = 0; i < work.size() - 2; i += 3) { + assertNotEquals(work.get(i + 0), work.get(i + 1)); + assertNotEquals(work.get(i + 0), work.get(i + 2)); + assertNotEquals(work.get(i + 1), work.get(i + 2)); + } + } finally { + scheduler.stop(); + } + } + // Get mocked call that has the CallRunner sleep for a while so that the fast // path isn't hit. private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java index 371897b..acad88c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java @@ -126,6 +126,8 @@ public class TestReportOnlineRegionsRace { public static void setUp() throws Exception { UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000); + UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT); UTIL.startMiniCluster(1); UTIL.createTable(NAME, CF); UTIL.waitTableAvailable(NAME);