Author: atm
Date: Fri May 16 21:25:05 2014
New Revision: 1595352

URL: http://svn.apache.org/r1595352
Log:
HDFS-6406. Add capability for NFS gateway to reject connections from 
unprivileged ports. Contributed by Aaron T. Myers.

Added:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
Modified:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1595352&r1=1595351&r2=1595352&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
 Fri May 16 21:25:05 2014
@@ -19,11 +19,14 @@ package org.apache.hadoop.oncrpc;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
 import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -37,7 +40,7 @@ import org.jboss.netty.channel.SimpleCha
  * and implement {@link #handleInternal} to handle the requests received.
  */
 public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
-  private static final Log LOG = LogFactory.getLog(RpcProgram.class);
+  static final Log LOG = LogFactory.getLog(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
   private final String host;
@@ -45,6 +48,7 @@ public abstract class RpcProgram extends
   private final int progNumber;
   private final int lowProgVersion;
   private final int highProgVersion;
+  private final boolean allowInsecurePorts;
   
   /**
    * If not null, this will be used as the socket to use to connect to the
@@ -61,10 +65,14 @@ public abstract class RpcProgram extends
    * @param progNumber program number as defined in RFC 1050
    * @param lowProgVersion lowest version of the specification supported
    * @param highProgVersion highest version of the specification supported
+   * @param DatagramSocket registrationSocket if not null, use this socket to
+   *        register with portmap daemon
+   * @param allowInsecurePorts true to allow client connections from
+   *        unprivileged ports, false otherwise
    */
   protected RpcProgram(String program, String host, int port, int progNumber,
       int lowProgVersion, int highProgVersion,
-      DatagramSocket registrationSocket) {
+      DatagramSocket registrationSocket, boolean allowInsecurePorts) {
     this.program = program;
     this.host = host;
     this.port = port;
@@ -72,6 +80,9 @@ public abstract class RpcProgram extends
     this.lowProgVersion = lowProgVersion;
     this.highProgVersion = highProgVersion;
     this.registrationSocket = registrationSocket;
+    this.allowInsecurePorts = allowInsecurePorts;
+    LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client "
+        + "connections from unprivileged ports");
   }
 
   /**
@@ -133,43 +144,82 @@ public abstract class RpcProgram extends
       throws Exception {
     RpcInfo info = (RpcInfo) e.getMessage();
     RpcCall call = (RpcCall) info.header();
+    
+    SocketAddress remoteAddress = info.remoteAddress();
+    if (!allowInsecurePorts) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Will not allow connections from unprivileged ports. " +
+            "Checking for valid client port...");
+      }
+      if (remoteAddress instanceof InetSocketAddress) {
+        InetSocketAddress inetRemoteAddress = (InetSocketAddress) 
remoteAddress;
+        if (inetRemoteAddress.getPort() > 1023) {
+          LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
+              + "which is an unprivileged port. Rejecting connection.");
+          sendRejectedReply(call, remoteAddress, ctx);
+          return;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Accepting connection from '" + remoteAddress + "'");
+          }
+        }
+      } else {
+        LOG.warn("Could not determine remote port of socket address '" +
+            remoteAddress + "'. Rejecting connection.");
+        sendRejectedReply(call, remoteAddress, ctx);
+        return;
+      }
+    }
+    
     if (LOG.isTraceEnabled()) {
       LOG.trace(program + " procedure #" + call.getProcedure());
     }
     
     if (this.progNumber != call.getProgram()) {
       LOG.warn("Invalid RPC call program " + call.getProgram());
-      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-          AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
-
-      XDR out = new XDR();
-      reply.write(out);
-      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
-          .buffer());
-      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
-      RpcUtil.sendRpcResponse(ctx, rsp);
+      sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx);
       return;
     }
 
     int ver = call.getVersion();
     if (ver < lowProgVersion || ver > highProgVersion) {
       LOG.warn("Invalid RPC call version " + ver);
-      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-          AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
-
-      XDR out = new XDR();
-      reply.write(out);
-      out.writeInt(lowProgVersion);
-      out.writeInt(highProgVersion);
-      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
-          .buffer());
-      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
-      RpcUtil.sendRpcResponse(ctx, rsp);
+      sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx);
       return;
     }
     
     handleInternal(ctx, info);
   }
+  
+  private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
+      AcceptState acceptState, ChannelHandlerContext ctx) {
+    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+        acceptState, Verifier.VERIFIER_NONE);
+
+    XDR out = new XDR();
+    reply.write(out);
+    if (acceptState == AcceptState.PROG_MISMATCH) {
+      out.writeInt(lowProgVersion);
+      out.writeInt(highProgVersion);
+    }
+    ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(b, remoteAddress);
+    RpcUtil.sendRpcResponse(ctx, rsp);
+  }
+  
+  private static void sendRejectedReply(RpcCall call,
+      SocketAddress remoteAddress, ChannelHandlerContext ctx) {
+    XDR out = new XDR();
+    RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),
+        RpcReply.ReplyState.MSG_DENIED,
+        RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
+    reply.write(out);
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(buf, remoteAddress);
+    RpcUtil.sendRpcResponse(ctx, rsp);
+  }
 
   protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo 
info);
   

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1595352&r1=1595351&r2=1595352&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
 Fri May 16 21:25:05 2014
@@ -28,6 +28,8 @@ import java.util.Random;
 import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.log4j.Level;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -38,10 +40,16 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestFrameDecoder {
+  
+  static {
+    ((Log4JLogger) RpcProgram.LOG).getLogger().setLevel(Level.ALL);
+  }
 
   private static int resultSize;
 
   static void testRequest(XDR request, int serverPort) {
+    // Reset resultSize so as to avoid interference from other tests in this 
class.
+    resultSize = 0;
     SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", serverPort, 
request,
         true);
     tcpClient.run();
@@ -50,9 +58,10 @@ public class TestFrameDecoder {
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
-        int progNumber, int lowProgVersion, int highProgVersion) {
+        int progNumber, int lowProgVersion, int highProgVersion,
+        boolean allowInsecurePorts) {
       super(program, host, port, progNumber, lowProgVersion, highProgVersion,
-          null);
+          null, allowInsecurePorts);
     }
 
     @Override
@@ -149,7 +158,41 @@ public class TestFrameDecoder {
 
   @Test
   public void testFrames() {
+    int serverPort = startRpcServer(true);
 
+    XDR xdrOut = createGetportMount();
+    int headerSize = xdrOut.size();
+    int bufsize = 2 * 1024 * 1024;
+    byte[] buffer = new byte[bufsize];
+    xdrOut.writeFixedOpaque(buffer);
+    int requestSize = xdrOut.size() - headerSize;
+
+    // Send the request to the server
+    testRequest(xdrOut, serverPort);
+
+    // Verify the server got the request with right size
+    assertEquals(requestSize, resultSize);
+  }
+  
+  @Test
+  public void testUnprivilegedPort() {
+    // Don't allow connections from unprivileged ports. Given that this test is
+    // presumably not being run by root, this will be the case.
+    int serverPort = startRpcServer(false);
+
+    XDR xdrOut = createGetportMount();
+    int bufsize = 2 * 1024 * 1024;
+    byte[] buffer = new byte[bufsize];
+    xdrOut.writeFixedOpaque(buffer);
+
+    // Send the request to the server
+    testRequest(xdrOut, serverPort);
+
+    // Verify the server rejected the request.
+    assertEquals(0, resultSize);
+  }
+  
+  private static int startRpcServer(boolean allowInsecurePorts) {
     Random rand = new Random();
     int serverPort = 30000 + rand.nextInt(10000);
     int retries = 10;    // A few retries in case initial choice is in use.
@@ -157,7 +200,7 @@ public class TestFrameDecoder {
     while (true) {
       try {
         RpcProgram program = new 
TestFrameDecoder.TestRpcProgram("TestRpcProgram",
-            "localhost", serverPort, 100000, 1, 2);
+            "localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
         SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 
1);
         tcpServer.run();
         break;          // Successfully bound a port, break out.
@@ -169,19 +212,7 @@ public class TestFrameDecoder {
         }
       }
     }
-
-    XDR xdrOut = createGetportMount();
-    int headerSize = xdrOut.size();
-    int bufsize = 2 * 1024 * 1024;
-    byte[] buffer = new byte[bufsize];
-    xdrOut.writeFixedOpaque(buffer);
-    int requestSize = xdrOut.size() - headerSize;
-
-    // Send the request to the server
-    testRequest(xdrOut, serverPort);
-
-    // Verify the server got the request with right size
-    assertEquals(requestSize, resultSize);
+    return serverPort;
   }
 
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {

Added: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties?rev=1595352&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
 (added)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties
 Fri May 16 21:25:05 2014
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n


Reply via email to