http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 99bfc61..2ebd1c5 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -43,8 +43,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.test.MockitoUtil;
+import org.apache.log4j.Level;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -956,7 +958,7 @@ public class TestRPC extends TestRpcBase {
   }
 
   /**
-   *  Test RPC backoff.
+   *  Test RPC backoff by queue full.
    */
   @Test (timeout=30000)
   public void testClientBackOff() throws Exception {
@@ -969,7 +971,7 @@ public class TestRPC extends TestRpcBase {
     final ExecutorService executorService =
         Executors.newFixedThreadPool(numClients);
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
-    conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
+    conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE +
         ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
     RPC.Builder builder = newServerBuilder(conf)
         .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
@@ -1019,6 +1021,92 @@ public class TestRPC extends TestRpcBase {
   }
 
   /**
+   *  Test RPC backoff by response time of each priority level.
+   */
+  @Test (timeout=30000)
+  public void testClientBackOffByResponseTime() throws Exception {
+    Server server;
+    final TestRpcService proxy;
+    boolean succeeded = false;
+    final int numClients = 1;
+    final int queueSizePerHandler = 3;
+
+    GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
+
+    final List<Future<Void>> res = new ArrayList<Future<Void>>();
+    final ExecutorService executorService =
+        Executors.newFixedThreadPool(numClients);
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0.";
+    conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+    conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
+        "org.apache.hadoop.ipc.FairCallQueue");
+    conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+        "org.apache.hadoop.ipc.DecayRpcScheduler");
+    conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
+        2);
+    conf.setBoolean(ns +
+        DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
+        true);
+    // set a small thresholds 2s and 4s for level 0 and level 1 for testing
+    conf.set(ns +
+        
DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
+        , "2s, 4s");
+
+    // Set max queue size to 3 so that 2 calls from the test won't trigger
+    // back off because the queue is full.
+    RPC.Builder builder = newServerBuilder(conf)
+        .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
+        .setVerbose(true);
+    server = setupTestServer(builder);
+
+    @SuppressWarnings("unchecked")
+    CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
+        .getInternalState(server, "callQueue"));
+    Whitebox.setInternalState(server, "callQueue", spy);
+
+    Exception lastException = null;
+    proxy = getClient(addr, conf);
+    try {
+      // start a sleep RPC call that sleeps 3s.
+      for (int i = 0; i < numClients; i++) {
+        res.add(executorService.submit(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws ServiceException, InterruptedException 
{
+                proxy.sleep(null, newSleepRequest(3000));
+                return null;
+              }
+            }));
+        verify(spy, timeout(500).times(i + 
1)).offer(Mockito.<Call>anyObject());
+      }
+      // Start another sleep RPC call and verify the call is backed off due to
+      // avg response time(3s) exceeds threshold (2s).
+      try {
+        // wait for the 1st response time update
+        Thread.sleep(5500);
+        proxy.sleep(null, newSleepRequest(100));
+      } catch (ServiceException e) {
+        RemoteException re = (RemoteException) e.getCause();
+        IOException unwrapExeption = re.unwrapRemoteException();
+        if (unwrapExeption instanceof RetriableException) {
+          succeeded = true;
+        } else {
+          lastException = unwrapExeption;
+        }
+      }
+    } finally {
+      executorService.shutdown();
+      stop(server, proxy);
+    }
+    if (lastException != null) {
+      LOG.error("Last received non-RetriableException:", lastException);
+    }
+    assertTrue("RetriableException not received", succeeded);
+  }
+
+  /**
    *  Test RPC timeout.
    */
   @Test(timeout=30000)

Reply via email to