Repository: hadoop Updated Branches: refs/heads/branch-2 1cd2fcf25 -> edbeefdb0 refs/heads/trunk 189a63a71 -> 49f6e3d35
HADOOP-10597. RPC Server signals backoff to clients when all request queues are full. (Contributed by Ming Ma) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/49f6e3d3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/49f6e3d3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/49f6e3d3 Branch: refs/heads/trunk Commit: 49f6e3d35e0f89637ae9ea970f249c13bdc0fd49 Parents: 189a63a Author: Arpit Agarwal <a...@apache.org> Authored: Thu Apr 23 09:35:04 2015 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Apr 23 09:35:04 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../hadoop/fs/CommonConfigurationKeys.java | 2 + .../org/apache/hadoop/ipc/CallQueueManager.java | 20 ++++++- .../main/java/org/apache/hadoop/ipc/Server.java | 36 +++++++++++- .../apache/hadoop/ipc/metrics/RpcMetrics.java | 10 ++++ .../apache/hadoop/ipc/TestCallQueueManager.java | 6 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 58 ++++++++++++++++++++ 7 files changed, 128 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8b9b442..db1425f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -505,6 +505,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran Dimitrijevic via raviprak) + HADOOP-10597. RPC Server signals backoff to clients when all request + queues are full. (Ming Ma via Arpit Agarwal) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 7575496..2721466 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; + public static final String IPC_BACKOFF_ENABLE = "backoff.enable"; + public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false; /** This is for specifying the implementation for the mappings from * hostnames to the racks they belong to http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 27949d0..1568bd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -38,16 +38,19 @@ public class CallQueueManager<E> { Class<?> queneClass, Class<E> elementClass) { return (Class<? extends BlockingQueue<E>>)queneClass; } - + private final boolean clientBackOffEnabled; + // Atomic refs point to active callQueue // We have two so we can better control swapping private final AtomicReference<BlockingQueue<E>> putRef; private final AtomicReference<BlockingQueue<E>> takeRef; public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass, - int maxQueueSize, String namespace, Configuration conf) { + boolean clientBackOffEnabled, int maxQueueSize, String namespace, + Configuration conf) { BlockingQueue<E> bq = createCallQueueInstance(backingClass, maxQueueSize, namespace, conf); + this.clientBackOffEnabled = clientBackOffEnabled; this.putRef = new AtomicReference<BlockingQueue<E>>(bq); this.takeRef = new AtomicReference<BlockingQueue<E>>(bq); LOG.info("Using callQueue " + backingClass); @@ -89,6 +92,10 @@ public class CallQueueManager<E> { " could not be constructed."); } + boolean isClientBackoffEnabled() { + return clientBackOffEnabled; + } + /** * Insert e into the backing queue or block until we can. * If we block and the queue changes on us, we will insert while the @@ -99,6 +106,15 @@ public class CallQueueManager<E> { } /** + * Insert e into the backing queue. + * Return true if e is queued. + * Return false if the queue is full. + */ + public boolean offer(E e) throws InterruptedException { + return putRef.get().offer(e); + } + + /** * Retrieve an E from the backing queue or block until we can. * Guaranteed to return an element from the current queue. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9aa362e..5f1809a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -503,6 +503,17 @@ public abstract class Server { callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } + /** + * Get from config if client backoff is enabled on that port. + */ + static boolean getClientBackoffEnable( + String prefix, Configuration conf) { + String name = prefix + "." + + CommonConfigurationKeys.IPC_BACKOFF_ENABLE; + return conf.getBoolean(name, + CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); + } + /** A call queued for handling. */ public static class Call implements Schedulable { private final int callId; // the client's call id @@ -1962,10 +1973,31 @@ public abstract class Server { rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceSpan); - callQueue.put(call); // queue the call; maybe blocked here + if (callQueue.isClientBackoffEnabled()) { + // if RPC queue is full, we will ask the RPC client to back off by + // throwing RetriableException. Whether RPC client will honor + // RetriableException and retry depends on client ipc retry policy. + // For example, FailoverOnNetworkExceptionRetry handles + // RetriableException. + queueRequestOrAskClientToBackOff(call); + } else { + callQueue.put(call); // queue the call; maybe blocked here + } incRpcCount(); // Increment the rpc count } + private void queueRequestOrAskClientToBackOff(Call call) + throws WrappedRpcServerException, InterruptedException { + // If rpc queue is full, we will ask the client to back off. + boolean isCallQueued = callQueue.offer(call); + if (!isCallQueued) { + rpcMetrics.incrClientBackoff(); + RetriableException retriableException = + new RetriableException("Server is too busy."); + throw new WrappedRpcServerException( + RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException); + } + } /** * Establish RPC connection setup by negotiating SASL if required, then @@ -2293,7 +2325,7 @@ public abstract class Server { // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf), - maxQueueSize, prefix, conf); + getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.authorize = http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 5eba44a..e90e516 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -95,6 +95,8 @@ public class RpcMetrics { MutableCounterLong rpcAuthorizationFailures; @Metric("Number of authorization sucesses") MutableCounterLong rpcAuthorizationSuccesses; + @Metric("Number of client backoff requests") + MutableCounterLong rpcClientBackoff; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -192,4 +194,12 @@ public class RpcMetrics { } } } + + /** + * One client backoff event + */ + //@Override + public void incrClientBackoff() { + rpcClientBackoff.incr(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 1b618b1..6e1838e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -143,21 +143,21 @@ public class TestCallQueueManager { @Test public void testCallQueueCapacity() throws InterruptedException { - manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); + manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null); assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity } @Test public void testEmptyConsume() throws InterruptedException { - manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); + manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null); assertCanTake(manager, 0, 1); // Fails since it's empty } @Test(timeout=60000) public void testSwapUnderContention() throws InterruptedException { - manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null); + manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null); ArrayList<Putter> producers = new ArrayList<Putter>(); ArrayList<Taker> consumers = new ArrayList<Taker>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 2db8522..f049395 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1081,6 +1081,64 @@ public class TestRPC { } } + /** + * Test RPC backoff. + */ + @Test (timeout=30000) + public void testClientBackOff() throws Exception { + boolean succeeded = false; + final int numClients = 2; + final List<Future<Void>> res = new ArrayList<Future<Void>>(); + final ExecutorService executorService = + Executors.newFixedThreadPool(numClients); + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + + ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + try { + // start a sleep RPC call to consume the only handler thread. + // Start another sleep RPC call to make callQueue full. + // Start another sleep RPC call to make reader thread block on CallQueue. + for (int i = 0; i < numClients; i++) { + res.add(executorService.submit( + new Callable<Void>() { + @Override + public Void call() throws IOException, InterruptedException { + proxy.sleep(100000); + return null; + } + })); + } + while (server.getCallQueueLen() != 1 + && countThreads(CallQueueManager.class.getName()) != 1) { + Thread.sleep(100); + } + try { + proxy.sleep(100); + } catch (RemoteException e) { + IOException unwrapExeption = e.unwrapRemoteException(); + if (unwrapExeption instanceof RetriableException) { + succeeded = true; + } + } + } finally { + server.stop(); + RPC.stopProxy(proxy); + executorService.shutdown(); + } + assertTrue("RetriableException not received", succeeded); + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);