Repository: hbase Updated Branches: refs/heads/0.98 f21501496 -> f321d7ec7 refs/heads/branch-1 c3d4d0233 -> 6fbcd0a2e refs/heads/branch-1.0 54242171d -> bbb2f1b00 refs/heads/branch-1.1 9b71dac49 -> b75605817 refs/heads/branch-1.2 2f3e98b7e -> c706d42e9 refs/heads/master 928dade1d -> efb82957d
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed Conflicts: hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f321d7ec Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f321d7ec Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f321d7ec Branch: refs/heads/0.98 Commit: f321d7ec7f2aecfb443619b1fad1dffbf703c420 Parents: f215014 Author: Andrew Purtell <apurt...@apache.org> Authored: Mon Oct 26 14:31:28 2015 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Mon Oct 26 14:52:15 2015 -0700 ---------------------------------------------------------------------- .../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++---- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++---- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++----- .../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------ .../org/apache/hadoop/hbase/ipc/TestIPC.java | 10 +++++++-- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++- .../hadoop/hbase/security/TestSecureRPC.java | 13 +++++++----- .../security/token/TestTokenAuthentication.java | 6 +++++- 8 files changed, 78 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 6dfef9c..4317815 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -161,9 +161,13 @@ public class IntegrationTestRpcClient { TestRpcServer rpcServer = new TestRpcServer(conf); rpcServer.start(); - rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + rpcServers.put(address, rpcServer); serverList.add(rpcServer); - LOG.info("Started server: " + rpcServer.getListenerAddress()); + LOG.info("Started server: " + address); return rpcServer; } finally { lock.writeLock().unlock(); @@ -180,7 +184,13 @@ public class IntegrationTestRpcClient { int size = rpcServers.size(); int rand = random.nextInt(size); rpcServer = serverList.remove(rand); - rpcServers.remove(rpcServer.getListenerAddress()); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + // Throw exception here. We can't remove this instance from the server map because + // we no longer have access to its map key + throw new IOException("Listener channel is closed"); + } + rpcServers.remove(address); if (rpcServer != null) { stopServer(rpcServer); @@ -298,8 +308,12 @@ public class IntegrationTestRpcClient { TestRpcServer server = cluster.getRandomServer(); try { User user = User.getCurrent(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress(), 60000); + rpcClient.callBlockingMethod(md, null, param, ret, user, address, 60000); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 142f005..60619bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -91,8 +92,9 @@ public class CallRunner { TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { - throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress() - + " is not running yet"); + InetSocketAddress address = rpcServer.getListenerAddress(); + throw new ServerNotRunningYetException("Server " + + (address != null ? address : "(channel closed)") + " is not running yet"); } if (call.tinfo != null) { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); @@ -134,9 +136,10 @@ public class CallRunner { throw e; } } catch (ClosedChannelException cce) { + InetSocketAddress address = rpcServer.getListenerAddress(); RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + - "this means that the server " + rpcServer.getListenerAddress() + " was processing a " + - "request but the client went away. The error message was: " + + "this means that the server " + (address != null ? address : "(channel closed)") + + " was processing a request but the client went away. The error message was: " + cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index c37411b..ceb39f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1760,8 +1760,9 @@ public class RpcServer implements RpcServerInterface { responder, totalRequestSize, null, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + getListenerAddress() + + "Call queue is full on " + (address != null ? address : "(channel closed)") + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; @@ -1789,8 +1790,9 @@ public class RpcServer implements RpcServerInterface { buf, offset, buf.length); } } catch (Throwable t) { - String msg = getListenerAddress() + " is unable to read call parameter from client " + - getHostAddress(); + InetSocketAddress address = getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); metrics.exception(t); @@ -2244,11 +2246,16 @@ public class RpcServer implements RpcServerInterface { } /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. + * Return the socket (ip+port) on which the RPC server is listening to. May return null if + * the listener channel is closed. + * @return the socket (ip+port) on which the RPC server is listening to, or null if this + * information cannot be determined */ @Override public synchronized InetSocketAddress getListenerAddress() { + if (listener == null) { + return null; + } return listener.getAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index abe1621..9c2f6ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -90,9 +90,12 @@ public class TestDelayedRpc { rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -171,9 +174,12 @@ public class TestDelayedRpc { rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -294,9 +300,12 @@ public class TestDelayedRpc { rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 7dcc37a..9a36440 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -227,9 +227,12 @@ public class TestIPC { for (int i = 0; i < count; i++) cells.add(CELL); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } Pair<Message, CellScanner> r = client.call(md, param, CellUtil.createCellScanner(cells), md.getOutputType().toProto(), User.getCurrent(), address, 0); int index = 0; @@ -322,9 +325,12 @@ public class TestIPC { rm.add(p); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); long startTime = System.currentTimeMillis(); User user = User.getCurrent(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < cycles; i++) { List<CellScannable> cells = new ArrayList<CellScannable>(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 63395af..f03ee19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -100,7 +100,11 @@ public class TestProtoBufRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); - this.isa = server.getListenerAddress(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.server.start(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java index 0beb6dc..735a121 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java @@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileF import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.isKerberosPropertySetted; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assume.assumeTrue; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -100,10 +100,13 @@ public class TestSecureRPC { rpcServer.start(); RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), - User.getCurrent(), 1000); + ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); List<Integer> results = new ArrayList<Integer>(); @@ -117,4 +120,4 @@ public class TestSecureRPC { rpcClient.stop(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index b5ae760..5c505df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -137,7 +137,11 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.BlockingInterface.class)); this.rpcServer = new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); - this.isa = this.rpcServer.getListenerAddress(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.sleeper = new Sleeper(1000, this); }