Repository: hadoop
Updated Branches:
  refs/heads/branch-2 1cd2fcf25 -> edbeefdb0
  refs/heads/trunk 189a63a71 -> 49f6e3d35


HADOOP-10597. RPC Server signals backoff to clients when all request queues are 
full. (Contributed by Ming Ma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/49f6e3d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/49f6e3d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/49f6e3d3

Branch: refs/heads/trunk
Commit: 49f6e3d35e0f89637ae9ea970f249c13bdc0fd49
Parents: 189a63a
Author: Arpit Agarwal <a...@apache.org>
Authored: Thu Apr 23 09:35:04 2015 -0700
Committer: Arpit Agarwal <a...@apache.org>
Committed: Thu Apr 23 09:35:04 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/fs/CommonConfigurationKeys.java      |  2 +
 .../org/apache/hadoop/ipc/CallQueueManager.java | 20 ++++++-
 .../main/java/org/apache/hadoop/ipc/Server.java | 36 +++++++++++-
 .../apache/hadoop/ipc/metrics/RpcMetrics.java   | 10 ++++
 .../apache/hadoop/ipc/TestCallQueueManager.java |  6 +-
 .../java/org/apache/hadoop/ipc/TestRPC.java     | 58 ++++++++++++++++++++
 7 files changed, 128 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 8b9b442..db1425f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -505,6 +505,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-11827. Speed-up distcp buildListing() using threadpool
     (Zoran Dimitrijevic via raviprak)
 
+    HADOOP-10597. RPC Server signals backoff to clients when all request
+    queues are full. (Ming Ma via Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HADOOP-11785. Reduce the number of listStatus operation in distcp

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 7575496..2721466 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -90,6 +90,8 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
   public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
   public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = 
"identity-provider.impl";
+  public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
+  public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
 
   /** This is for specifying the implementation for the mappings from
    * hostnames to the racks they belong to

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 27949d0..1568bd6 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -38,16 +38,19 @@ public class CallQueueManager<E> {
       Class<?> queneClass, Class<E> elementClass) {
     return (Class<? extends BlockingQueue<E>>)queneClass;
   }
-  
+  private final boolean clientBackOffEnabled;
+
   // Atomic refs point to active callQueue
   // We have two so we can better control swapping
   private final AtomicReference<BlockingQueue<E>> putRef;
   private final AtomicReference<BlockingQueue<E>> takeRef;
 
   public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
-      int maxQueueSize, String namespace, Configuration conf) {
+      boolean clientBackOffEnabled, int maxQueueSize, String namespace,
+      Configuration conf) {
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
       maxQueueSize, namespace, conf);
+    this.clientBackOffEnabled = clientBackOffEnabled;
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
     LOG.info("Using callQueue " + backingClass);
@@ -89,6 +92,10 @@ public class CallQueueManager<E> {
       " could not be constructed.");
   }
 
+  boolean isClientBackoffEnabled() {
+    return clientBackOffEnabled;
+  }
+
   /**
    * Insert e into the backing queue or block until we can.
    * If we block and the queue changes on us, we will insert while the
@@ -99,6 +106,15 @@ public class CallQueueManager<E> {
   }
 
   /**
+   * Insert e into the backing queue.
+   * Return true if e is queued.
+   * Return false if the queue is full.
+   */
+  public boolean offer(E e) throws InterruptedException {
+    return putRef.get().offer(e);
+  }
+
+  /**
    * Retrieve an E from the backing queue or block until we can.
    * Guaranteed to return an element from the current queue.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 9aa362e..5f1809a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -503,6 +503,17 @@ public abstract class Server {
     callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, 
conf);
   }
 
+  /**
+   * Get from config if client backoff is enabled on that port.
+   */
+  static boolean getClientBackoffEnable(
+      String prefix, Configuration conf) {
+    String name = prefix + "." +
+        CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
+    return conf.getBoolean(name,
+        CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
+  }
+
   /** A call queued for handling. */
   public static class Call implements Schedulable {
     private final int callId;             // the client's call id
@@ -1962,10 +1973,31 @@ public abstract class Server {
           rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
           header.getClientId().toByteArray(), traceSpan);
 
-      callQueue.put(call);              // queue the call; maybe blocked here
+      if (callQueue.isClientBackoffEnabled()) {
+        // if RPC queue is full, we will ask the RPC client to back off by
+        // throwing RetriableException. Whether RPC client will honor
+        // RetriableException and retry depends on client ipc retry policy.
+        // For example, FailoverOnNetworkExceptionRetry handles
+        // RetriableException.
+        queueRequestOrAskClientToBackOff(call);
+      } else {
+        callQueue.put(call);              // queue the call; maybe blocked here
+      }
       incRpcCount();  // Increment the rpc count
     }
 
+    private void queueRequestOrAskClientToBackOff(Call call)
+        throws WrappedRpcServerException, InterruptedException {
+      // If rpc queue is full, we will ask the client to back off.
+      boolean isCallQueued = callQueue.offer(call);
+      if (!isCallQueued) {
+        rpcMetrics.incrClientBackoff();
+        RetriableException retriableException =
+            new RetriableException("Server is too busy.");
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
+      }
+    }
 
     /**
      * Establish RPC connection setup by negotiating SASL if required, then
@@ -2293,7 +2325,7 @@ public abstract class Server {
     // Setup appropriate callqueue
     final String prefix = getQueueClassPrefix();
     this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
-        maxQueueSize, prefix, conf);
+        getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
 
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index 5eba44a..e90e516 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -95,6 +95,8 @@ public class RpcMetrics {
   MutableCounterLong rpcAuthorizationFailures;
   @Metric("Number of authorization sucesses")
   MutableCounterLong rpcAuthorizationSuccesses;
+  @Metric("Number of client backoff requests")
+  MutableCounterLong rpcClientBackoff;
 
   @Metric("Number of open connections") public int numOpenConnections() {
     return server.getNumOpenConnections();
@@ -192,4 +194,12 @@ public class RpcMetrics {
       }
     }
   }
+
+  /**
+   * One client backoff event
+   */
+  //@Override
+  public void incrClientBackoff() {
+    rpcClientBackoff.incr();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 1b618b1..6e1838e 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -143,21 +143,21 @@ public class TestCallQueueManager {
 
   @Test
   public void testCallQueueCapacity() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
 
     assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
   }
 
   @Test
   public void testEmptyConsume() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
 
     assertCanTake(manager, 0, 1); // Fails since it's empty
   }
 
   @Test(timeout=60000)
   public void testSwapUnderContention() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", 
null);
 
     ArrayList<Putter> producers = new ArrayList<Putter>();
     ArrayList<Taker> consumers = new ArrayList<Taker>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/49f6e3d3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 2db8522..f049395 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -1081,6 +1081,64 @@ public class TestRPC {
     }
   }
 
+  /**
+   *  Test RPC backoff.
+   */
+  @Test (timeout=30000)
+  public void testClientBackOff() throws Exception {
+    boolean succeeded = false;
+    final int numClients = 2;
+    final List<Future<Void>> res = new ArrayList<Future<Void>>();
+    final ExecutorService executorService =
+        Executors.newFixedThreadPool(numClients);
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
+        ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0)
+        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy =
+        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
+            NetUtils.getConnectAddress(server), conf);
+    try {
+      // start a sleep RPC call to consume the only handler thread.
+      // Start another sleep RPC call to make callQueue full.
+      // Start another sleep RPC call to make reader thread block on CallQueue.
+      for (int i = 0; i < numClients; i++) {
+        res.add(executorService.submit(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws IOException, InterruptedException {
+                proxy.sleep(100000);
+                return null;
+              }
+            }));
+      }
+      while (server.getCallQueueLen() != 1
+          && countThreads(CallQueueManager.class.getName()) != 1) {
+        Thread.sleep(100);
+      }
+      try {
+        proxy.sleep(100);
+      } catch (RemoteException e) {
+        IOException unwrapExeption = e.unwrapRemoteException();
+        if (unwrapExeption instanceof RetriableException) {
+            succeeded = true;
+        }
+      }
+    } finally {
+      server.stop();
+      RPC.stopProxy(proxy);
+      executorService.shutdown();
+    }
+    assertTrue("RetriableException not received", succeeded);
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);
 

Reply via email to