http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 99bfc61..2ebd1c5 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -43,8 +43,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
+import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -956,7 +958,7 @@ public class TestRPC extends TestRpcBase {
}
/**
- * Test RPC backoff.
+ * Test RPC backoff by queue full.
*/
@Test (timeout=30000)
public void testClientBackOff() throws Exception {
@@ -969,7 +971,7 @@ public class TestRPC extends TestRpcBase {
final ExecutorService executorService =
Executors.newFixedThreadPool(numClients);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
- conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
+ conf.setBoolean(CommonConfigurationKeys.IPC_NAMESPACE +
".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
RPC.Builder builder = newServerBuilder(conf)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
@@ -1019,6 +1021,92 @@ public class TestRPC extends TestRpcBase {
}
/**
+ * Test RPC backoff by response time of each priority level.
+ */
+ @Test (timeout=30000)
+ public void testClientBackOffByResponseTime() throws Exception {
+ Server server;
+ final TestRpcService proxy;
+ boolean succeeded = false;
+ final int numClients = 1;
+ final int queueSizePerHandler = 3;
+
+ GenericTestUtils.setLogLevel(DecayRpcScheduler.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(RPC.LOG, Level.DEBUG);
+
+ final List<Future<Void>> res = new ArrayList<Future<Void>>();
+ final ExecutorService executorService =
+ Executors.newFixedThreadPool(numClients);
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+ final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0.";
+ conf.setBoolean(ns + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+ conf.setStrings(ns + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
+ "org.apache.hadoop.ipc.FairCallQueue");
+ conf.setStrings(ns + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+ "org.apache.hadoop.ipc.DecayRpcScheduler");
+ conf.setInt(ns + CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
+ 2);
+ conf.setBoolean(ns +
+ DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
+ true);
+ // set a small thresholds 2s and 4s for level 0 and level 1 for testing
+ conf.set(ns +
+
DecayRpcScheduler.IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY
+ , "2s, 4s");
+
+ // Set max queue size to 3 so that 2 calls from the test won't trigger
+ // back off because the queue is full.
+ RPC.Builder builder = newServerBuilder(conf)
+ .setQueueSizePerHandler(queueSizePerHandler).setNumHandlers(1)
+ .setVerbose(true);
+ server = setupTestServer(builder);
+
+ @SuppressWarnings("unchecked")
+ CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
+ .getInternalState(server, "callQueue"));
+ Whitebox.setInternalState(server, "callQueue", spy);
+
+ Exception lastException = null;
+ proxy = getClient(addr, conf);
+ try {
+ // start a sleep RPC call that sleeps 3s.
+ for (int i = 0; i < numClients; i++) {
+ res.add(executorService.submit(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws ServiceException, InterruptedException
{
+ proxy.sleep(null, newSleepRequest(3000));
+ return null;
+ }
+ }));
+ verify(spy, timeout(500).times(i +
1)).offer(Mockito.<Call>anyObject());
+ }
+ // Start another sleep RPC call and verify the call is backed off due to
+ // avg response time(3s) exceeds threshold (2s).
+ try {
+ // wait for the 1st response time update
+ Thread.sleep(5500);
+ proxy.sleep(null, newSleepRequest(100));
+ } catch (ServiceException e) {
+ RemoteException re = (RemoteException) e.getCause();
+ IOException unwrapExeption = re.unwrapRemoteException();
+ if (unwrapExeption instanceof RetriableException) {
+ succeeded = true;
+ } else {
+ lastException = unwrapExeption;
+ }
+ }
+ } finally {
+ executorService.shutdown();
+ stop(server, proxy);
+ }
+ if (lastException != null) {
+ LOG.error("Last received non-RetriableException:", lastException);
+ }
+ assertTrue("RetriableException not received", succeeded);
+ }
+
+ /**
* Test RPC timeout.
*/
@Test(timeout=30000)