HADOOP-10940. RPC client does no bounds checking of responses. Contributed by 
Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4d07687
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4d07687
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4d07687

Branch: refs/heads/HDFS-10285
Commit: d4d076876a8d0002bd3a73491d8459d11cb4896c
Parents: baab489
Author: Kihwal Lee <kih...@apache.org>
Authored: Fri Sep 9 10:39:35 2016 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Fri Sep 9 10:39:35 2016 -0500

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |  10 +-
 .../hadoop/fs/CommonConfigurationKeys.java      |  12 +-
 .../main/java/org/apache/hadoop/ipc/Client.java | 178 +++++++++++++------
 .../apache/hadoop/security/SaslRpcClient.java   |  25 +--
 .../src/main/resources/core-default.xml         |  17 +-
 .../java/org/apache/hadoop/ipc/TestIPC.java     |  86 ++++++++-
 6 files changed, 246 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml 
b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index b650eae..ec7c396 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -44,7 +44,7 @@
      --> 
      <Match>
        <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="out" />
+       <Field name="ipcStreams" />
        <Bug pattern="IS2_INCONSISTENT_SYNC" />
      </Match>
     <!--
@@ -341,13 +341,7 @@
        <Method name="removeRenewAction" />
        <Bug pattern="BC_UNCONFIRMED_CAST" />
      </Match>
-     
-     <!-- Inconsistent synchronization flagged by findbugs is not valid. -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="in" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
+
      <!-- 
        The switch condition for INITIATE is expected to fallthru to RESPONSE
        to process initial sasl response token included in the INITIATE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 76cf9cc..2b530f0 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -78,12 +78,20 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
   /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
   public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
       100;
-      
+
+  /** Max request size a server will accept. */
   public static final String IPC_MAXIMUM_DATA_LENGTH =
       "ipc.maximum.data.length";
-  
+  /** Default value for IPC_MAXIMUM_DATA_LENGTH. */
   public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024;
 
+  /** Max response size a client will accept. */
+  public static final String IPC_MAXIMUM_RESPONSE_LENGTH =
+      "ipc.maximum.response.length";
+  /** Default value for IPC_MAXIMUM_RESPONSE_LENGTH. */
+  public static final int IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT =
+      128 * 1024 * 1024;
+
   /** How many calls per handler are allowed in the queue. */
   public static final String  IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
     "ipc.server.handler.queue.size";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 567b932..94f1945 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
@@ -413,8 +414,8 @@ public class Client implements AutoCloseable {
     private SaslRpcClient saslRpcClient;
     
     private Socket socket = null;                 // connected socket
-    private DataInputStream in;
-    private DataOutputStream out;
+    private IpcStreams ipcStreams;
+    private final int maxResponseLength;
     private final int rpcTimeout;
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
@@ -426,8 +427,8 @@ public class Client implements AutoCloseable {
     private final boolean doPing; //do we need to send ping message
     private final int pingInterval; // how often sends ping to the server
     private final int soTimeout; // used by ipc ping and rpc timeout
-    private ResponseBuffer pingRequest; // ping message
-    
+    private byte[] pingRequest; // ping message
+
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity 
time
@@ -446,6 +447,9 @@ public class Client implements AutoCloseable {
             0,
             new UnknownHostException());
       }
+      this.maxResponseLength = remoteId.conf.getInt(
+          CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
+          CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
       this.rpcTimeout = remoteId.getRpcTimeout();
       this.maxIdleTime = remoteId.getMaxIdleTime();
       this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
@@ -456,12 +460,13 @@ public class Client implements AutoCloseable {
       this.doPing = remoteId.getDoPing();
       if (doPing) {
         // construct a RPC header with the callId as the ping callId
-        pingRequest = new ResponseBuffer();
+        ResponseBuffer buf = new ResponseBuffer();
         RpcRequestHeaderProto pingHeader = ProtoUtil
             .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
                 OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,
                 RpcConstants.INVALID_RETRY_COUNT, clientId);
-        pingHeader.writeDelimitedTo(pingRequest);
+        pingHeader.writeDelimitedTo(buf);
+        pingRequest = buf.toByteArray();
       }
       this.pingInterval = remoteId.getPingInterval();
       if (rpcTimeout > 0) {
@@ -596,15 +601,15 @@ public class Client implements AutoCloseable {
       }
       return false;
     }
-    
-    private synchronized AuthMethod setupSaslConnection(final InputStream in2, 
-        final OutputStream out2) throws IOException {
+
+    private synchronized AuthMethod setupSaslConnection(IpcStreams streams)
+        throws IOException {
       // Do not use Client.conf here! We must use ConnectionId.conf, since the
       // Client object is cached and shared between all RPC clients, even those
       // for separate services.
       saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
           remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
-      return saslRpcClient.saslConnect(in2, out2);
+      return saslRpcClient.saslConnect(streams);
     }
 
     /**
@@ -770,12 +775,9 @@ public class Client implements AutoCloseable {
         Random rand = null;
         while (true) {
           setupConnection();
-          InputStream inStream = NetUtils.getInputStream(socket);
-          OutputStream outStream = NetUtils.getOutputStream(socket);
-          writeConnectionHeader(outStream);
+          ipcStreams = new IpcStreams(socket, maxResponseLength);
+          writeConnectionHeader(ipcStreams);
           if (authProtocol == AuthProtocol.SASL) {
-            final InputStream in2 = inStream;
-            final OutputStream out2 = outStream;
             UserGroupInformation ticket = remoteId.getTicket();
             if (ticket.getRealUser() != null) {
               ticket = ticket.getRealUser();
@@ -786,7 +788,7 @@ public class Client implements AutoCloseable {
                     @Override
                     public AuthMethod run()
                         throws IOException, InterruptedException {
-                      return setupSaslConnection(in2, out2);
+                      return setupSaslConnection(ipcStreams);
                     }
                   });
             } catch (IOException ex) {
@@ -805,8 +807,7 @@ public class Client implements AutoCloseable {
             }
             if (authMethod != AuthMethod.SIMPLE) {
               // Sasl connect is successful. Let's set up Sasl i/o streams.
-              inStream = saslRpcClient.getInputStream(inStream);
-              outStream = saslRpcClient.getOutputStream(outStream);
+              ipcStreams.setSaslClient(saslRpcClient);
               // for testing
               remoteId.saslQop =
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
@@ -825,18 +826,11 @@ public class Client implements AutoCloseable {
               }
             }
           }
-        
+
           if (doPing) {
-            inStream = new PingInputStream(inStream);
+            ipcStreams.setInputStream(new PingInputStream(ipcStreams.in));
           }
-          this.in = new DataInputStream(new BufferedInputStream(inStream));
 
-          // SASL may have already buffered the stream
-          if (!(outStream instanceof BufferedOutputStream)) {
-            outStream = new BufferedOutputStream(outStream);
-          }
-          this.out = new DataOutputStream(outStream);
-          
           writeConnectionContext(remoteId, authMethod);
 
           // update last activity time
@@ -950,17 +944,28 @@ public class Client implements AutoCloseable {
      * |  AuthProtocol (1 byte)           |      
      * +----------------------------------+
      */
-    private void writeConnectionHeader(OutputStream outStream)
+    private void writeConnectionHeader(IpcStreams streams)
         throws IOException {
-      DataOutputStream out = new DataOutputStream(new 
BufferedOutputStream(outStream));
-      // Write out the header, version and authentication method
-      out.write(RpcConstants.HEADER.array());
-      out.write(RpcConstants.CURRENT_VERSION);
-      out.write(serviceClass);
-      out.write(authProtocol.callId);
-      out.flush();
+      // Write out the header, version and authentication method.
+      // The output stream is buffered but we must not flush it yet.  The
+      // connection setup protocol requires the client to send multiple
+      // messages before reading a response.
+      //
+      //   insecure: send header+context+call, read
+      //   secure  : send header+negotiate, read, (sasl), context+call, read
+      //
+      // The client must flush only when it's prepared to read.  Otherwise
+      // "broken pipe" exceptions occur if the server closes the connection
+      // before all messages are sent.
+      final DataOutputStream out = streams.out;
+      synchronized (out) {
+        out.write(RpcConstants.HEADER.array());
+        out.write(RpcConstants.CURRENT_VERSION);
+        out.write(serviceClass);
+        out.write(authProtocol.callId);
+      }
     }
-    
+
     /* Write the connection context header for each connection
      * Out is not synchronized because only the first thread does this.
      */
@@ -976,12 +981,17 @@ public class Client implements AutoCloseable {
           .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
               OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
               RpcConstants.INVALID_RETRY_COUNT, clientId);
+      // do not flush.  the context and first ipc call request must be sent
+      // together to avoid possibility of broken pipes upon authz failure.
+      // see writeConnectionHeader
       final ResponseBuffer buf = new ResponseBuffer();
       connectionContextHeader.writeDelimitedTo(buf);
       message.writeDelimitedTo(buf);
-      buf.writeTo(out);
+      synchronized (ipcStreams.out) {
+        ipcStreams.sendRequest(buf.toByteArray());
+      }
     }
-    
+
     /* wait till someone signals us to start reading RPC response or
      * it is idle too long, it is marked as to be closed, 
      * or the client is marked as not running.
@@ -1024,9 +1034,9 @@ public class Client implements AutoCloseable {
       long curTime = Time.now();
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
-        synchronized (out) {
-          pingRequest.writeTo(out);
-          out.flush();
+        synchronized (ipcStreams.out) {
+          ipcStreams.sendRequest(pingRequest);
+          ipcStreams.flush();
         }
       }
     }
@@ -1092,15 +1102,16 @@ public class Client implements AutoCloseable {
           @Override
           public void run() {
             try {
-              synchronized (Connection.this.out) {
+              synchronized (ipcStreams.out) {
                 if (shouldCloseConnection.get()) {
                   return;
                 }
                 if (LOG.isDebugEnabled()) {
                   LOG.debug(getName() + " sending #" + call.id);
                 }
-                buf.writeTo(out); // RpcRequestHeader + RpcRequest
-                out.flush();
+                // RpcRequestHeader + RpcRequest
+                ipcStreams.sendRequest(buf.toByteArray());
+                ipcStreams.flush();
               }
             } catch (IOException e) {
               // exception at this point would leave the connection in an
@@ -1141,10 +1152,7 @@ public class Client implements AutoCloseable {
       touch();
       
       try {
-        int totalLen = in.readInt();
-        ByteBuffer bb = ByteBuffer.allocate(totalLen);
-        in.readFully(bb.array());
-
+        ByteBuffer bb = ipcStreams.readResponse();
         RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
         RpcResponseHeaderProto header =
             packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
@@ -1209,8 +1217,7 @@ public class Client implements AutoCloseable {
       connections.remove(remoteId, this);
 
       // close the streams and therefore the socket
-      IOUtils.closeStream(out);
-      IOUtils.closeStream(in);
+      IOUtils.closeStream(ipcStreams);
       disposeSasl();
 
       // clean up all calls
@@ -1739,4 +1746,75 @@ public class Client implements AutoCloseable {
   public void close() throws Exception {
     stop();
   }
+
+  /** Manages the input and output streams for an IPC connection.
+   *  Only exposed for use by SaslRpcClient.
+   */
+  @InterfaceAudience.Private
+  public static class IpcStreams implements Closeable, Flushable {
+    private DataInputStream in;
+    public DataOutputStream out;
+    private int maxResponseLength;
+    private boolean firstResponse = true;
+
+    IpcStreams(Socket socket, int maxResponseLength) throws IOException {
+      this.maxResponseLength = maxResponseLength;
+      setInputStream(
+          new BufferedInputStream(NetUtils.getInputStream(socket)));
+      setOutputStream(
+          new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+    }
+
+    void setSaslClient(SaslRpcClient client) throws IOException {
+      setInputStream(client.getInputStream(in));
+      setOutputStream(client.getOutputStream(out));
+    }
+
+    private void setInputStream(InputStream is) {
+      this.in = (is instanceof DataInputStream)
+          ? (DataInputStream)is : new DataInputStream(is);
+    }
+
+    private void setOutputStream(OutputStream os) {
+      this.out = (os instanceof DataOutputStream)
+          ? (DataOutputStream)os : new DataOutputStream(os);
+    }
+
+    public ByteBuffer readResponse() throws IOException {
+      int length = in.readInt();
+      if (firstResponse) {
+        firstResponse = false;
+        // pre-rpcv9 exception, almost certainly a version mismatch.
+        if (length == -1) {
+          in.readInt(); // ignore fatal/error status, it's fatal for us.
+          throw new RemoteException(WritableUtils.readString(in),
+                                    WritableUtils.readString(in));
+        }
+      }
+      if (length <= 0) {
+        throw new RpcException("RPC response has invalid length");
+      }
+      if (maxResponseLength > 0 && length > maxResponseLength) {
+        throw new RpcException("RPC response exceeds maximum data length");
+      }
+      ByteBuffer bb = ByteBuffer.allocate(length);
+      in.readFully(bb.array());
+      return bb;
+    }
+
+    public void sendRequest(byte[] buf) throws IOException {
+      out.write(buf);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() {
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
index 60ae3b0..cd942b7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.security;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
@@ -53,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.ipc.Client.IpcStreams;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.ResponseBuffer;
@@ -353,24 +352,16 @@ public class SaslRpcClient {
    * @return AuthMethod used to negotiate the connection
    * @throws IOException
    */
-  public AuthMethod saslConnect(InputStream inS, OutputStream outS)
-      throws IOException {
-    DataInputStream inStream = new DataInputStream(new 
BufferedInputStream(inS));
-    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
-        outS));
-    
+  public AuthMethod saslConnect(IpcStreams ipcStreams) throws IOException {
     // redefined if/when a SASL negotiation starts, can be queried if the
     // negotiation fails
     authMethod = AuthMethod.SIMPLE;
 
-    sendSaslMessage(outStream, negotiateRequest);
-
+    sendSaslMessage(ipcStreams.out, negotiateRequest);
     // loop until sasl is complete or a rpc error occurs
     boolean done = false;
     do {
-      int rpcLen = inStream.readInt();
-      ByteBuffer bb = ByteBuffer.allocate(rpcLen);
-      inStream.readFully(bb.array());
+      ByteBuffer bb = ipcStreams.readResponse();
 
       RpcWritable.Buffer saslPacket = RpcWritable.Buffer.wrap(bb);
       RpcResponseHeaderProto header =
@@ -447,7 +438,7 @@ public class SaslRpcClient {
         }
       }
       if (response != null) {
-        sendSaslMessage(outStream, response.build());
+        sendSaslMessage(ipcStreams.out, response.build());
       }
     } while (!done);
     return authMethod;
@@ -461,8 +452,10 @@ public class SaslRpcClient {
     ResponseBuffer buf = new ResponseBuffer();
     saslHeader.writeDelimitedTo(buf);
     message.writeDelimitedTo(buf);
-    buf.writeTo(out);
-    out.flush();
+    synchronized (out) {
+      buf.writeTo(out);
+      out.flush();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 569d894..0a12017 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1313,10 +1313,19 @@
   <name>ipc.maximum.data.length</name>
   <value>67108864</value>
   <description>This indicates the maximum IPC message length (bytes) that can 
be
-    accepted by the server. Messages larger than this value are rejected by
-    server immediately. This setting should rarely need to be changed. It 
merits
-    investigating whether the cause of long RPC messages can be fixed instead,
-    e.g. by splitting into smaller messages.
+    accepted by the server. Messages larger than this value are rejected by the
+    immediately to avoid possible OOMs. This setting should rarely need to be
+    changed.
+  </description>
+</property>
+
+<property>
+  <name>ipc.maximum.response.length</name>
+  <value>134217728</value>
+  <description>This indicates the maximum IPC message length (bytes) that can 
be
+    accepted by the client. Messages larger than this value are rejected
+    immediately to avoid possible OOMs. This setting should rarely need to be
+    changed.  Set to 0 to disable.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4d07687/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 82da62d..6fb3511 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -40,6 +40,7 @@ import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
@@ -49,6 +50,8 @@ import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -76,6 +79,9 @@ import org.apache.hadoop.ipc.Server.Connection;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -112,6 +118,8 @@ public class TestIPC {
   public void setupConf() {
     conf = new Configuration();
     Client.setPingInterval(conf, PING_INTERVAL);
+    // tests may enable security, so disable before each test
+    UserGroupInformation.setConfiguration(conf);
   }
 
   static final Random RANDOM = new Random();
@@ -123,8 +131,8 @@ public class TestIPC {
 
   static ConnectionId getConnectionId(InetSocketAddress addr, int rpcTimeout,
       Configuration conf) throws IOException {
-    return ConnectionId.getConnectionId(addr, null, null, rpcTimeout, null,
-        conf);
+    return ConnectionId.getConnectionId(addr, null,
+        UserGroupInformation.getCurrentUser(), rpcTimeout, null, conf);
   }
 
   static Writable call(Client client, InetSocketAddress addr,
@@ -1402,6 +1410,80 @@ public class TestIPC {
     client.stop();
   }
   
+  @Test(timeout=4000)
+  public void testInsecureVersionMismatch() throws IOException {
+    checkVersionMismatch();
+  }
+
+  @Test(timeout=4000)
+  public void testSecureVersionMismatch() throws IOException {
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+    UserGroupInformation.setConfiguration(conf);
+    checkVersionMismatch();
+  }
+
+  private void checkVersionMismatch() throws IOException {
+    try (final ServerSocket listenSocket = new ServerSocket()) {
+      listenSocket.bind(null);
+      InetSocketAddress addr =
+          (InetSocketAddress) listenSocket.getLocalSocketAddress();
+
+      // open a socket that accepts a client and immediately returns
+      // a version mismatch exception.
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      executor.submit(new Runnable(){
+        @Override
+        public void run() {
+          try {
+            Socket socket = listenSocket.accept();
+            socket.getOutputStream().write(
+                NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
+            socket.close();
+          } catch (Throwable t) {
+            // ignore.
+          }
+        }
+      });
+
+      try {
+        Client client = new Client(LongWritable.class, conf);
+        call(client, 0, addr, conf);
+      } catch (RemoteException re) {
+        Assert.assertEquals(RPC.VersionMismatch.class.getName(),
+            re.getClassName());
+        Assert.assertEquals(NetworkTraces.HADOOP0_20_ERROR_MSG,
+            re.getMessage());
+        return;
+      }
+      Assert.fail("didn't get version mismatch");
+    }
+  }
+
+  @Test
+  public void testRpcResponseLimit() throws Throwable {
+    Server server = new TestServer(1, false);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 0);
+    Client client = new Client(LongWritable.class, conf);
+    call(client, 0, addr, conf);
+
+    conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, 4);
+    client = new Client(LongWritable.class, conf);
+    try {
+      call(client, 0, addr, conf);
+    } catch (IOException ioe) {
+      Throwable t = ioe.getCause();
+      Assert.assertNotNull(t);
+      Assert.assertEquals(RpcException.class, t.getClass());
+      Assert.assertEquals("RPC response exceeds maximum data length",
+          t.getMessage());
+      return;
+    }
+    Assert.fail("didn't get limit exceeded");
+  }
+
   private void doIpcVersionTest(
       byte[] requestData,
       byte[] expectedResponse) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to