HADOOP-10940. RPC client does no bounds checking of responses. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4d07687 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4d07687 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4d07687 Branch: refs/heads/HDFS-10285 Commit: d4d076876a8d0002bd3a73491d8459d11cb4896c Parents: baab489 Author: Kihwal Lee <kih...@apache.org> Authored: Fri Sep 9 10:39:35 2016 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Fri Sep 9 10:39:35 2016 -0500 ---------------------------------------------------------------------- .../dev-support/findbugsExcludeFile.xml | 10 +- .../hadoop/fs/CommonConfigurationKeys.java | 12 +- .../main/java/org/apache/hadoop/ipc/Client.java | 178 +++++++++++++------ .../apache/hadoop/security/SaslRpcClient.java | 25 +-- .../src/main/resources/core-default.xml | 17 +- .../java/org/apache/hadoop/ipc/TestIPC.java | 86 ++++++++- 6 files changed, 246 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index b650eae..ec7c396 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -44,7 +44,7 @@ --> <Match> <Class name="org.apache.hadoop.ipc.Client$Connection" /> - <Field name="out" /> + <Field name="ipcStreams" /> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> <!-- @@ -341,13 +341,7 @@ <Method name="removeRenewAction" /> <Bug pattern="BC_UNCONFIRMED_CAST" /> </Match> - - <!-- Inconsistent synchronization flagged by findbugs is not valid. --> - <Match> - <Class name="org.apache.hadoop.ipc.Client$Connection" /> - <Field name="in" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> + <!-- The switch condition for INITIATE is expected to fallthru to RESPONSE to process initial sasl response token included in the INITIATE http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 76cf9cc..2b530f0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -78,12 +78,20 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */ public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT = 100; - + + /** Max request size a server will accept. */ public static final String IPC_MAXIMUM_DATA_LENGTH = "ipc.maximum.data.length"; - + /** Default value for IPC_MAXIMUM_DATA_LENGTH. */ public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024; + /** Max response size a client will accept. */ + public static final String IPC_MAXIMUM_RESPONSE_LENGTH = + "ipc.maximum.response.length"; + /** Default value for IPC_MAXIMUM_RESPONSE_LENGTH. */ + public static final int IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT = + 128 * 1024 * 1024; + /** How many calls per handler are allowed in the queue. */ public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 567b932..94f1945 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; @@ -413,8 +414,8 @@ public class Client implements AutoCloseable { private SaslRpcClient saslRpcClient; private Socket socket = null; // connected socket - private DataInputStream in; - private DataOutputStream out; + private IpcStreams ipcStreams; + private final int maxResponseLength; private final int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs @@ -426,8 +427,8 @@ public class Client implements AutoCloseable { private final boolean doPing; //do we need to send ping message private final int pingInterval; // how often sends ping to the server private final int soTimeout; // used by ipc ping and rpc timeout - private ResponseBuffer pingRequest; // ping message - + private byte[] pingRequest; // ping message + // currently active calls private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); private AtomicLong lastActivity = new AtomicLong();// last I/O activity time @@ -446,6 +447,9 @@ public class Client implements AutoCloseable { 0, new UnknownHostException()); } + this.maxResponseLength = remoteId.conf.getInt( + CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT); this.rpcTimeout = remoteId.getRpcTimeout(); this.maxIdleTime = remoteId.getMaxIdleTime(); this.connectionRetryPolicy = remoteId.connectionRetryPolicy; @@ -456,12 +460,13 @@ public class Client implements AutoCloseable { this.doPing = remoteId.getDoPing(); if (doPing) { // construct a RPC header with the callId as the ping callId - pingRequest = new ResponseBuffer(); + ResponseBuffer buf = new ResponseBuffer(); RpcRequestHeaderProto pingHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); - pingHeader.writeDelimitedTo(pingRequest); + pingHeader.writeDelimitedTo(buf); + pingRequest = buf.toByteArray(); } this.pingInterval = remoteId.getPingInterval(); if (rpcTimeout > 0) { @@ -596,15 +601,15 @@ public class Client implements AutoCloseable { } return false; } - - private synchronized AuthMethod setupSaslConnection(final InputStream in2, - final OutputStream out2) throws IOException { + + private synchronized AuthMethod setupSaslConnection(IpcStreams streams) + throws IOException { // Do not use Client.conf here! We must use ConnectionId.conf, since the // Client object is cached and shared between all RPC clients, even those // for separate services. saslRpcClient = new SaslRpcClient(remoteId.getTicket(), remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf); - return saslRpcClient.saslConnect(in2, out2); + return saslRpcClient.saslConnect(streams); } /** @@ -770,12 +775,9 @@ public class Client implements AutoCloseable { Random rand = null; while (true) { setupConnection(); - InputStream inStream = NetUtils.getInputStream(socket); - OutputStream outStream = NetUtils.getOutputStream(socket); - writeConnectionHeader(outStream); + ipcStreams = new IpcStreams(socket, maxResponseLength); + writeConnectionHeader(ipcStreams); if (authProtocol == AuthProtocol.SASL) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; UserGroupInformation ticket = remoteId.getTicket(); if (ticket.getRealUser() != null) { ticket = ticket.getRealUser(); @@ -786,7 +788,7 @@ public class Client implements AutoCloseable { @Override public AuthMethod run() throws IOException, InterruptedException { - return setupSaslConnection(in2, out2); + return setupSaslConnection(ipcStreams); } }); } catch (IOException ex) { @@ -805,8 +807,7 @@ public class Client implements AutoCloseable { } if (authMethod != AuthMethod.SIMPLE) { // Sasl connect is successful. Let's set up Sasl i/o streams. - inStream = saslRpcClient.getInputStream(inStream); - outStream = saslRpcClient.getOutputStream(outStream); + ipcStreams.setSaslClient(saslRpcClient); // for testing remoteId.saslQop = (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP); @@ -825,18 +826,11 @@ public class Client implements AutoCloseable { } } } - + if (doPing) { - inStream = new PingInputStream(inStream); + ipcStreams.setInputStream(new PingInputStream(ipcStreams.in)); } - this.in = new DataInputStream(new BufferedInputStream(inStream)); - // SASL may have already buffered the stream - if (!(outStream instanceof BufferedOutputStream)) { - outStream = new BufferedOutputStream(outStream); - } - this.out = new DataOutputStream(outStream); - writeConnectionContext(remoteId, authMethod); // update last activity time @@ -950,17 +944,28 @@ public class Client implements AutoCloseable { * | AuthProtocol (1 byte) | * +----------------------------------+ */ - private void writeConnectionHeader(OutputStream outStream) + private void writeConnectionHeader(IpcStreams streams) throws IOException { - DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); - // Write out the header, version and authentication method - out.write(RpcConstants.HEADER.array()); - out.write(RpcConstants.CURRENT_VERSION); - out.write(serviceClass); - out.write(authProtocol.callId); - out.flush(); + // Write out the header, version and authentication method. + // The output stream is buffered but we must not flush it yet. The + // connection setup protocol requires the client to send multiple + // messages before reading a response. + // + // insecure: send header+context+call, read + // secure : send header+negotiate, read, (sasl), context+call, read + // + // The client must flush only when it's prepared to read. Otherwise + // "broken pipe" exceptions occur if the server closes the connection + // before all messages are sent. + final DataOutputStream out = streams.out; + synchronized (out) { + out.write(RpcConstants.HEADER.array()); + out.write(RpcConstants.CURRENT_VERSION); + out.write(serviceClass); + out.write(authProtocol.callId); + } } - + /* Write the connection context header for each connection * Out is not synchronized because only the first thread does this. */ @@ -976,12 +981,17 @@ public class Client implements AutoCloseable { .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); + // do not flush. the context and first ipc call request must be sent + // together to avoid possibility of broken pipes upon authz failure. + // see writeConnectionHeader final ResponseBuffer buf = new ResponseBuffer(); connectionContextHeader.writeDelimitedTo(buf); message.writeDelimitedTo(buf); - buf.writeTo(out); + synchronized (ipcStreams.out) { + ipcStreams.sendRequest(buf.toByteArray()); + } } - + /* wait till someone signals us to start reading RPC response or * it is idle too long, it is marked as to be closed, * or the client is marked as not running. @@ -1024,9 +1034,9 @@ public class Client implements AutoCloseable { long curTime = Time.now(); if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); - synchronized (out) { - pingRequest.writeTo(out); - out.flush(); + synchronized (ipcStreams.out) { + ipcStreams.sendRequest(pingRequest); + ipcStreams.flush(); } } } @@ -1092,15 +1102,16 @@ public class Client implements AutoCloseable { @Override public void run() { try { - synchronized (Connection.this.out) { + synchronized (ipcStreams.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) { LOG.debug(getName() + " sending #" + call.id); } - buf.writeTo(out); // RpcRequestHeader + RpcRequest - out.flush(); + // RpcRequestHeader + RpcRequest + ipcStreams.sendRequest(buf.toByteArray()); + ipcStreams.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an @@ -1141,10 +1152,7 @@ public class Client implements AutoCloseable { touch(); try { - int totalLen = in.readInt(); - ByteBuffer bb = ByteBuffer.allocate(totalLen); - in.readFully(bb.array()); - + ByteBuffer bb = ipcStreams.readResponse(); RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb); RpcResponseHeaderProto header = packet.getValue(RpcResponseHeaderProto.getDefaultInstance()); @@ -1209,8 +1217,7 @@ public class Client implements AutoCloseable { connections.remove(remoteId, this); // close the streams and therefore the socket - IOUtils.closeStream(out); - IOUtils.closeStream(in); + IOUtils.closeStream(ipcStreams); disposeSasl(); // clean up all calls @@ -1739,4 +1746,75 @@ public class Client implements AutoCloseable { public void close() throws Exception { stop(); } + + /** Manages the input and output streams for an IPC connection. + * Only exposed for use by SaslRpcClient. + */ + @InterfaceAudience.Private + public static class IpcStreams implements Closeable, Flushable { + private DataInputStream in; + public DataOutputStream out; + private int maxResponseLength; + private boolean firstResponse = true; + + IpcStreams(Socket socket, int maxResponseLength) throws IOException { + this.maxResponseLength = maxResponseLength; + setInputStream( + new BufferedInputStream(NetUtils.getInputStream(socket))); + setOutputStream( + new BufferedOutputStream(NetUtils.getOutputStream(socket))); + } + + void setSaslClient(SaslRpcClient client) throws IOException { + setInputStream(client.getInputStream(in)); + setOutputStream(client.getOutputStream(out)); + } + + private void setInputStream(InputStream is) { + this.in = (is instanceof DataInputStream) + ? (DataInputStream)is : new DataInputStream(is); + } + + private void setOutputStream(OutputStream os) { + this.out = (os instanceof DataOutputStream) + ? (DataOutputStream)os : new DataOutputStream(os); + } + + public ByteBuffer readResponse() throws IOException { + int length = in.readInt(); + if (firstResponse) { + firstResponse = false; + // pre-rpcv9 exception, almost certainly a version mismatch. + if (length == -1) { + in.readInt(); // ignore fatal/error status, it's fatal for us. + throw new RemoteException(WritableUtils.readString(in), + WritableUtils.readString(in)); + } + } + if (length <= 0) { + throw new RpcException("RPC response has invalid length"); + } + if (maxResponseLength > 0 && length > maxResponseLength) { + throw new RpcException("RPC response exceeds maximum data length"); + } + ByteBuffer bb = ByteBuffer.allocate(length); + in.readFully(bb.array()); + return bb; + } + + public void sendRequest(byte[] buf) throws IOException { + out.write(buf); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index 60ae3b0..cd942b7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -18,11 +18,9 @@ package org.apache.hadoop.security; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.FilterInputStream; import java.io.FilterOutputStream; import java.io.IOException; @@ -53,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.GlobPattern; +import org.apache.hadoop.ipc.Client.IpcStreams; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.ResponseBuffer; @@ -353,24 +352,16 @@ public class SaslRpcClient { * @return AuthMethod used to negotiate the connection * @throws IOException */ - public AuthMethod saslConnect(InputStream inS, OutputStream outS) - throws IOException { - DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS)); - DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream( - outS)); - + public AuthMethod saslConnect(IpcStreams ipcStreams) throws IOException { // redefined if/when a SASL negotiation starts, can be queried if the // negotiation fails authMethod = AuthMethod.SIMPLE; - sendSaslMessage(outStream, negotiateRequest); - + sendSaslMessage(ipcStreams.out, negotiateRequest); // loop until sasl is complete or a rpc error occurs boolean done = false; do { - int rpcLen = inStream.readInt(); - ByteBuffer bb = ByteBuffer.allocate(rpcLen); - inStream.readFully(bb.array()); + ByteBuffer bb = ipcStreams.readResponse(); RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb); RpcResponseHeaderProto header = @@ -447,7 +438,7 @@ public class SaslRpcClient { } } if (response != null) { - sendSaslMessage(outStream, response.build()); + sendSaslMessage(ipcStreams.out, response.build()); } } while (!done); return authMethod; @@ -461,8 +452,10 @@ public class SaslRpcClient { ResponseBuffer buf = new ResponseBuffer(); saslHeader.writeDelimitedTo(buf); message.writeDelimitedTo(buf); - buf.writeTo(out); - out.flush(); + synchronized (out) { + buf.writeTo(out); + out.flush(); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 569d894..0a12017 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1313,10 +1313,19 @@ <name>ipc.maximum.data.length</name> <value>67108864</value> <description>This indicates the maximum IPC message length (bytes) that can be - accepted by the server. Messages larger than this value are rejected by - server immediately. This setting should rarely need to be changed. It merits - investigating whether the cause of long RPC messages can be fixed instead, - e.g. by splitting into smaller messages. + accepted by the server. Messages larger than this value are rejected by the + immediately to avoid possible OOMs. This setting should rarely need to be + changed. + </description> +</property> + +<property> + <name>ipc.maximum.response.length</name> + <value>134217728</value> + <description>This indicates the maximum IPC message length (bytes) that can be + accepted by the client. Messages larger than this value are rejected + immediately to avoid possible OOMs. This setting should rarely need to be + changed. Set to 0 to disable. </description> </property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 82da62d..6fb3511 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -40,6 +40,7 @@ import java.io.OutputStream; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; @@ -49,6 +50,8 @@ import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -76,6 +79,9 @@ import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; @@ -112,6 +118,8 @@ public class TestIPC { public void setupConf() { conf = new Configuration(); Client.setPingInterval(conf, PING_INTERVAL); + // tests may enable security, so disable before each test + UserGroupInformation.setConfiguration(conf); } static final Random RANDOM = new Random(); @@ -123,8 +131,8 @@ public class TestIPC { static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout, Configuration conf) throws IOException { - return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null, - conf); + return ConnectionId.getConnectionId(addr, null, + UserGroupInformation.getCurrentUser(), rpcTimeout, null, conf); } static Writable call(Client client, InetSocketAddress addr, @@ -1402,6 +1410,80 @@ public class TestIPC { client.stop(); } + @Test(timeout=4000) + public void testInsecureVersionMismatch() throws IOException { + checkVersionMismatch(); + } + + @Test(timeout=4000) + public void testSecureVersionMismatch() throws IOException { + SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); + UserGroupInformation.setConfiguration(conf); + checkVersionMismatch(); + } + + private void checkVersionMismatch() throws IOException { + try (final ServerSocket listenSocket = new ServerSocket()) { + listenSocket.bind(null); + InetSocketAddress addr = + (InetSocketAddress) listenSocket.getLocalSocketAddress(); + + // open a socket that accepts a client and immediately returns + // a version mismatch exception. + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable(){ + @Override + public void run() { + try { + Socket socket = listenSocket.accept(); + socket.getOutputStream().write( + NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC); + socket.close(); + } catch (Throwable t) { + // ignore. + } + } + }); + + try { + Client client = new Client(LongWritable.class, conf); + call(client, 0, addr, conf); + } catch (RemoteException re) { + Assert.assertEquals(RPC.VersionMismatch.class.getName(), + re.getClassName()); + Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG, + re.getMessage()); + return; + } + Assert.fail("didn't get version mismatch"); + } + } + + @Test + public void testRpcResponseLimit() throws Throwable { + Server server = new TestServer(1, false); + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 0); + Client client = new Client(LongWritable.class, conf); + call(client, 0, addr, conf); + + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 4); + client = new Client(LongWritable.class, conf); + try { + call(client, 0, addr, conf); + } catch (IOException ioe) { + Throwable t = ioe.getCause(); + Assert.assertNotNull(t); + Assert.assertEquals(RpcException.class, t.getClass()); + Assert.assertEquals("RPC response exceeds maximum data length", + t.getMessage()); + return; + } + Assert.fail("didn't get limit exceeded"); + } + private void doIpcVersionTest( byte[] requestData, byte[] expectedResponse) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org