Repository: hadoop Updated Branches: refs/heads/branch-2.6.1 95edb6e64 -> 4ec7b6174
HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is full. Contributed by Ming Ma. (cherry picked from commit 685af8a3d0504724fe588daf3722519fedc45b01) (cherry picked from commit 6c01e586198a3c3ebaa7561778c124ae62553246) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ec7b617 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ec7b617 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ec7b617 Branch: refs/heads/branch-2.6.1 Commit: 4ec7b6174df4db30eb0d7354cd5ad0f40ab874dd Parents: 95edb6e Author: Kihwal Lee <[email protected]> Authored: Tue Feb 17 17:14:58 2015 -0600 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Mon Aug 31 18:10:00 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../main/java/org/apache/hadoop/ipc/Server.java | 3 +- .../java/org/apache/hadoop/ipc/TestRPC.java | 68 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6e56370..5891feb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -39,6 +39,9 @@ Release 2.6.1 - UNRELEASED HADOOP-11482. Use correct UGI when KMSClientProvider is called by a proxy user. Contributed by Arun Suresh. + HADOOP-11295. RPC Server Reader thread can't shutdown if RPCCallQueue is + full. (Ming Ma via kihwal) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 127f22b..43d76a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -662,7 +662,8 @@ public abstract class Server { assert !running; readSelector.wakeup(); try { - join(); + super.interrupt(); + super.join(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ec7b617/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index f1855f62..8a4dcb6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -38,9 +38,16 @@ import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -1009,6 +1016,67 @@ public class TestRPC { } } + /** + * Verify the RPC server can shutdown properly when callQueue is full. + */ + @Test (timeout=30000) + public void testRPCServerShutdown() throws Exception { + final int numClients = 3; + final List<Future<Void>> res = new ArrayList<Future<Void>>(); + final ExecutorService executorService = + Executors.newFixedThreadPool(numClients); + final Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0) + .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) + .build(); + server.start(); + + final TestProtocol proxy = + RPC.getProxy(TestProtocol.class, TestProtocol.versionID, + NetUtils.getConnectAddress(server), conf); + try { + // start a sleep RPC call to consume the only handler thread. + // Start another sleep RPC call to make callQueue full. + // Start another sleep RPC call to make reader thread block on CallQueue. + for (int i = 0; i < numClients; i++) { + res.add(executorService.submit( + new Callable<Void>() { + @Override + public Void call() throws IOException, InterruptedException { + proxy.sleep(100000); + return null; + } + })); + } + while (server.getCallQueueLen() != 1 + && countThreads(CallQueueManager.class.getName()) != 1 + && countThreads(TestProtocol.class.getName()) != 1) { + Thread.sleep(100); + } + } finally { + try { + server.stop(); + assertEquals("Not enough clients", numClients, res.size()); + for (Future<Void> f : res) { + try { + f.get(); + fail("Future get should not return"); + } catch (ExecutionException e) { + assertTrue("Unexpected exception: " + e, + e.getCause() instanceof IOException); + LOG.info("Expected exception", e.getCause()); + } + } + } finally { + RPC.stopProxy(proxy); + executorService.shutdown(); + } + } + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);
