HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bbb2f1b0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bbb2f1b0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bbb2f1b0 Branch: refs/heads/branch-1.0 Commit: bbb2f1b0035a9e659c9b7bd7b10515a66881c2ca Parents: 5424217 Author: Andrew Purtell <apurt...@apache.org> Authored: Mon Oct 26 14:31:28 2015 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Mon Oct 26 15:11:09 2015 -0700 ---------------------------------------------------------------------- .../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++---- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++---- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++----- .../hbase/regionserver/RSRpcServices.java | 6 +++++- .../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------ .../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++- .../hbase/ipc/TestRpcHandlerException.java | 6 +++++- .../TestRSKilledWhenInitializing.java | 8 +++++-- .../hadoop/hbase/security/TestSecureRPC.java | 11 ++++++---- .../security/token/TestTokenAuthentication.java | 6 +++++- 11 files changed, 89 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 70538b9..c790b4d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -162,9 +162,13 @@ public class IntegrationTestRpcClient { TestRpcServer rpcServer = new TestRpcServer(conf); rpcServer.start(); - rpcServers.put(rpcServer.getListenerAddress(), rpcServer); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + rpcServers.put(address, rpcServer); serverList.add(rpcServer); - LOG.info("Started server: " + rpcServer.getListenerAddress()); + LOG.info("Started server: " + address); return rpcServer; } finally { lock.writeLock().unlock(); @@ -181,7 +185,13 @@ public class IntegrationTestRpcClient { int size = rpcServers.size(); int rand = random.nextInt(size); rpcServer = serverList.remove(rand); - rpcServers.remove(rpcServer.getListenerAddress()); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + // Throw exception here. We can't remove this instance from the server map because + // we no longer have access to its map key + throw new IOException("Listener channel is closed"); + } + rpcServers.remove(address); if (rpcServer != null) { stopServer(rpcServer); @@ -299,8 +309,12 @@ public class IntegrationTestRpcClient { TestRpcServer server = cluster.getRandomServer(); try { User user = User.getCurrent(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress()); + rpcClient.callBlockingMethod(md, null, param, ret, user, address); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index ef6fa88..51e3fba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc; * See the License for the specific language governing permissions and * limitations under the License. */ +import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.hbase.CellScanner; @@ -95,8 +96,9 @@ public class CallRunner { TraceScope traceScope = null; try { if (!this.rpcServer.isStarted()) { - throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress() - + " is not running yet"); + InetSocketAddress address = rpcServer.getListenerAddress(); + throw new ServerNotRunningYetException("Server " + + (address != null ? address : "(channel closed)") + " is not running yet"); } if (call.tinfo != null) { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); @@ -143,9 +145,10 @@ public class CallRunner { throw e; } } catch (ClosedChannelException cce) { + InetSocketAddress address = rpcServer.getListenerAddress(); RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " + - "this means that the server " + rpcServer.getListenerAddress() + " was processing a " + - "request but the client went away. The error message was: " + + "this means that the server " + (address != null ? address : "(channel closed)") + + " was processing a request but the client went away. The error message was: " + cce.getMessage()); } catch (Exception e) { RpcServer.LOG.warn(Thread.currentThread().getName() http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a22a15d..48ca2e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1730,8 +1730,9 @@ public class RpcServer implements RpcServerInterface { new Call(id, this.service, null, null, null, null, this, responder, totalRequestSize, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + InetSocketAddress address = getListenerAddress(); setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(), - "Call queue is full on " + getListenerAddress() + + "Call queue is full on " + (address != null ? address : "(channel closed)") + ", is hbase.ipc.server.max.callqueue.size too small?"); responder.doRespond(callTooBig); return; @@ -1759,8 +1760,9 @@ public class RpcServer implements RpcServerInterface { buf, offset, buf.length); } } catch (Throwable t) { - String msg = getListenerAddress() + " is unable to read call parameter from client " + - getHostAddress(); + InetSocketAddress address = getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + getHostAddress(); LOG.warn(msg, t); // probably the hbase hadoop version does not match the running hadoop version @@ -2165,11 +2167,16 @@ public class RpcServer implements RpcServerInterface { } /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. + * Return the socket (ip+port) on which the RPC server is listening to. May return null if + * the listener channel is closed. + * @return the socket (ip+port) on which the RPC server is listening to, or null if this + * information cannot be determined */ @Override public synchronized InetSocketAddress getListenerAddress() { + if (listener == null) { + return null; + } return listener.getAddress(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5a18db1..da69595 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -813,8 +813,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } // Set our address, however we need the final port that was given to rpcServer - isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort()); + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index deee717..41ee4cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -91,9 +91,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -173,9 +176,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), RPC_CLIENT_TIMEOUT); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); @@ -297,9 +303,12 @@ public class TestDelayedRpc { RpcClient rpcClient = RpcClientFactory.createClient( conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), + ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 2c70eb4..1ecb200 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -321,9 +321,12 @@ public class TestIPC { rm.add(p); try { rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); long startTime = System.currentTimeMillis(); User user = User.getCurrent(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (int i = 0; i < cycles; i++) { List<CellScannable> cells = new ArrayList<CellScannable>(); // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index fc2734f..ffb3927 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -100,7 +100,11 @@ public class TestProtoBufRpc { Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); - this.isa = server.getListenerAddress(); + InetSocketAddress address = server.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.server.start(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 9cb1cc5..193a217 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -177,8 +177,12 @@ public class TestRpcHandlerException { rpcServer.start(); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md - .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0); + .getOutputType().toProto(), User.getCurrent(), address, 0); } catch (Throwable e) { assert(abortable.isAborted() == true); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java index 4ad2c31..9a48db7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing { @Override protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException { if (firstRS.getAndSet(false)) { + InetSocketAddress address = super.getRpcServer().getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } for (NameStringPair e : c.getMapEntriesList()) { String key = e.getName(); // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - assertEquals(super.getRpcServer().getListenerAddress().getHostName(), - hostnameFromMasterPOV); + assertEquals(address.getHostName(), hostnameFromMasterPOV); } } while (!masterActive) { http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java index a3cae76..e5b7996 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java @@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileF import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.isKerberosPropertySetted; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assume.assumeTrue; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -102,10 +102,13 @@ public class TestSecureRPC { RpcClient rpcClient = RpcClientFactory .createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); try { + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), - rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), - User.getCurrent(), 1000); + ServerName.valueOf(address.getHostName(), address.getPort(), + System.currentTimeMillis()), User.getCurrent(), 1000); TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel); List<Integer> results = new ArrayList<Integer>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 40df515..bae7a5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -141,7 +141,11 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.BlockingInterface.class)); this.rpcServer = new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); - this.isa = this.rpcServer.getListenerAddress(); + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + this.isa = address; this.sleeper = new Sleeper(1000, this); }