HBASE-15957 RpcClientImpl.close never ends in some circumstances Signed-off-by: Enis Soztutar <e...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da88b482 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da88b482 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da88b482 Branch: refs/heads/hbase-12439 Commit: da88b4824054f57fbcbc7795469ab2369a39b5ed Parents: 376ad0d Author: Sergey Soldatov <s...@apache.org> Authored: Sun Jun 5 23:46:03 2016 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Jun 7 11:33:03 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 5 ++- .../hbase/ipc/IntegrationTestRpcClient.java | 35 ++++++++++++++++---- 2 files changed, 31 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index d8c87e9..dc05af1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -1202,9 +1202,8 @@ public class RpcClientImpl extends AbstractRpcClient { } if (connsToClose != null) { for (Connection conn : connsToClose) { - if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) { - conn.close(); - } + conn.markClosed(new InterruptedIOException("RpcClient is closing")); + conn.close(); } } // wait until all connections are closed http://git-wip-us.apache.org/repos/asf/hbase/blob/da88b482/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index c28f3e6..6c0fbcc 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.AsyncRpcClient; -import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcClientImpl; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -290,6 +285,7 @@ public class IntegrationTestRpcClient { static class SimpleClient extends Thread { AbstractRpcClient rpcClient; AtomicBoolean running = new AtomicBoolean(true); + AtomicBoolean sending = new AtomicBoolean(false); AtomicReference<Throwable> exception = new AtomicReference<>(null); Cluster cluster; String id; @@ -319,6 +315,7 @@ public class IntegrationTestRpcClient { if (address == null) { throw new IOException("Listener channel is closed"); } + sending.set(true); ret = (EchoResponseProto) rpcClient.callBlockingMethod(md, null, param, ret, user, address); } catch (Exception e) { @@ -340,6 +337,9 @@ public class IntegrationTestRpcClient { void stopRunning() { running.set(false); } + boolean isSending() { + return sending.get(); + } void rethrowException() throws Throwable { if (exception.get() != null) { @@ -348,6 +348,29 @@ public class IntegrationTestRpcClient { } } + /* + Test that not started connections are successfully removed from connection pool when + rpc client is closing. + */ + @Test (timeout = 30000) + public void testRpcWithWriteThread() throws IOException, InterruptedException { + LOG.info("Starting test"); + Cluster cluster = new Cluster(1, 1); + cluster.startServer(); + conf.setBoolean(SPECIFIC_WRITE_THREAD, true); + for(int i = 0; i <1000; i++) { + AbstractRpcClient rpcClient = createRpcClient(conf, true); + SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); + client.start(); + while(!client.isSending()) { + Thread.sleep(1); + } + client.stopRunning(); + rpcClient.close(); + } + } + + @Test (timeout = 900000) public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { for (int i = 0; i < numIterations; i++) {