Author: atm Date: Fri May 16 21:25:05 2014 New Revision: 1595352 URL: http://svn.apache.org/r1595352 Log: HDFS-6406. Add capability for NFS gateway to reject connections from unprivileged ports. Contributed by Aaron T. Myers.
Added: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1595352&r1=1595351&r2=1595352&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Fri May 16 21:25:05 2014 @@ -19,11 +19,14 @@ package org.apache.hadoop.oncrpc; import java.io.IOException; import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState; import org.apache.hadoop.oncrpc.security.Verifier; +import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.portmap.PortmapMapping; import org.apache.hadoop.portmap.PortmapRequest; import org.jboss.netty.buffer.ChannelBuffer; @@ -37,7 +40,7 @@ import org.jboss.netty.channel.SimpleCha * and implement {@link #handleInternal} to handle the requests received. */ public abstract class RpcProgram extends SimpleChannelUpstreamHandler { - private static final Log LOG = LogFactory.getLog(RpcProgram.class); + static final Log LOG = LogFactory.getLog(RpcProgram.class); public static final int RPCB_PORT = 111; private final String program; private final String host; @@ -45,6 +48,7 @@ public abstract class RpcProgram extends private final int progNumber; private final int lowProgVersion; private final int highProgVersion; + private final boolean allowInsecurePorts; /** * If not null, this will be used as the socket to use to connect to the @@ -61,10 +65,14 @@ public abstract class RpcProgram extends * @param progNumber program number as defined in RFC 1050 * @param lowProgVersion lowest version of the specification supported * @param highProgVersion highest version of the specification supported + * @param DatagramSocket registrationSocket if not null, use this socket to + * register with portmap daemon + * @param allowInsecurePorts true to allow client connections from + * unprivileged ports, false otherwise */ protected RpcProgram(String program, String host, int port, int progNumber, int lowProgVersion, int highProgVersion, - DatagramSocket registrationSocket) { + DatagramSocket registrationSocket, boolean allowInsecurePorts) { this.program = program; this.host = host; this.port = port; @@ -72,6 +80,9 @@ public abstract class RpcProgram extends this.lowProgVersion = lowProgVersion; this.highProgVersion = highProgVersion; this.registrationSocket = registrationSocket; + this.allowInsecurePorts = allowInsecurePorts; + LOG.info("Will " + (allowInsecurePorts ? "" : "not ") + "accept client " + + "connections from unprivileged ports"); } /** @@ -133,43 +144,82 @@ public abstract class RpcProgram extends throws Exception { RpcInfo info = (RpcInfo) e.getMessage(); RpcCall call = (RpcCall) info.header(); + + SocketAddress remoteAddress = info.remoteAddress(); + if (!allowInsecurePorts) { + if (LOG.isDebugEnabled()) { + LOG.debug("Will not allow connections from unprivileged ports. " + + "Checking for valid client port..."); + } + if (remoteAddress instanceof InetSocketAddress) { + InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress; + if (inetRemoteAddress.getPort() > 1023) { + LOG.warn("Connection attempted from '" + inetRemoteAddress + "' " + + "which is an unprivileged port. Rejecting connection."); + sendRejectedReply(call, remoteAddress, ctx); + return; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Accepting connection from '" + remoteAddress + "'"); + } + } + } else { + LOG.warn("Could not determine remote port of socket address '" + + remoteAddress + "'. Rejecting connection."); + sendRejectedReply(call, remoteAddress, ctx); + return; + } + } + if (LOG.isTraceEnabled()) { LOG.trace(program + " procedure #" + call.getProcedure()); } if (this.progNumber != call.getProgram()) { LOG.warn("Invalid RPC call program " + call.getProgram()); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE); - - XDR out = new XDR(); - reply.write(out); - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() - .buffer()); - RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); - RpcUtil.sendRpcResponse(ctx, rsp); + sendAcceptedReply(call, remoteAddress, AcceptState.PROG_UNAVAIL, ctx); return; } int ver = call.getVersion(); if (ver < lowProgVersion || ver > highProgVersion) { LOG.warn("Invalid RPC call version " + ver); - RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), - AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE); - - XDR out = new XDR(); - reply.write(out); - out.writeInt(lowProgVersion); - out.writeInt(highProgVersion); - ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() - .buffer()); - RpcResponse rsp = new RpcResponse(b, info.remoteAddress()); - RpcUtil.sendRpcResponse(ctx, rsp); + sendAcceptedReply(call, remoteAddress, AcceptState.PROG_MISMATCH, ctx); return; } handleInternal(ctx, info); } + + private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress, + AcceptState acceptState, ChannelHandlerContext ctx) { + RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(), + acceptState, Verifier.VERIFIER_NONE); + + XDR out = new XDR(); + reply.write(out); + if (acceptState == AcceptState.PROG_MISMATCH) { + out.writeInt(lowProgVersion); + out.writeInt(highProgVersion); + } + ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(b, remoteAddress); + RpcUtil.sendRpcResponse(ctx, rsp); + } + + private static void sendRejectedReply(RpcCall call, + SocketAddress remoteAddress, ChannelHandlerContext ctx) { + XDR out = new XDR(); + RpcDeniedReply reply = new RpcDeniedReply(call.getXid(), + RpcReply.ReplyState.MSG_DENIED, + RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone()); + reply.write(out); + ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap() + .buffer()); + RpcResponse rsp = new RpcResponse(buf, remoteAddress); + RpcUtil.sendRpcResponse(ctx, rsp); + } protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info); Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1595352&r1=1595351&r2=1595352&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Fri May 16 21:25:05 2014 @@ -28,6 +28,8 @@ import java.util.Random; import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder; import org.apache.hadoop.oncrpc.security.CredentialsNone; import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.log4j.Level; +import org.apache.commons.logging.impl.Log4JLogger; import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -38,10 +40,16 @@ import org.junit.Test; import org.mockito.Mockito; public class TestFrameDecoder { + + static { + ((Log4JLogger) RpcProgram.LOG).getLogger().setLevel(Level.ALL); + } private static int resultSize; static void testRequest(XDR request, int serverPort) { + // Reset resultSize so as to avoid interference from other tests in this class. + resultSize = 0; SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", serverPort, request, true); tcpClient.run(); @@ -50,9 +58,10 @@ public class TestFrameDecoder { static class TestRpcProgram extends RpcProgram { protected TestRpcProgram(String program, String host, int port, - int progNumber, int lowProgVersion, int highProgVersion) { + int progNumber, int lowProgVersion, int highProgVersion, + boolean allowInsecurePorts) { super(program, host, port, progNumber, lowProgVersion, highProgVersion, - null); + null, allowInsecurePorts); } @Override @@ -149,7 +158,41 @@ public class TestFrameDecoder { @Test public void testFrames() { + int serverPort = startRpcServer(true); + XDR xdrOut = createGetportMount(); + int headerSize = xdrOut.size(); + int bufsize = 2 * 1024 * 1024; + byte[] buffer = new byte[bufsize]; + xdrOut.writeFixedOpaque(buffer); + int requestSize = xdrOut.size() - headerSize; + + // Send the request to the server + testRequest(xdrOut, serverPort); + + // Verify the server got the request with right size + assertEquals(requestSize, resultSize); + } + + @Test + public void testUnprivilegedPort() { + // Don't allow connections from unprivileged ports. Given that this test is + // presumably not being run by root, this will be the case. + int serverPort = startRpcServer(false); + + XDR xdrOut = createGetportMount(); + int bufsize = 2 * 1024 * 1024; + byte[] buffer = new byte[bufsize]; + xdrOut.writeFixedOpaque(buffer); + + // Send the request to the server + testRequest(xdrOut, serverPort); + + // Verify the server rejected the request. + assertEquals(0, resultSize); + } + + private static int startRpcServer(boolean allowInsecurePorts) { Random rand = new Random(); int serverPort = 30000 + rand.nextInt(10000); int retries = 10; // A few retries in case initial choice is in use. @@ -157,7 +200,7 @@ public class TestFrameDecoder { while (true) { try { RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram", - "localhost", serverPort, 100000, 1, 2); + "localhost", serverPort, 100000, 1, 2, allowInsecurePorts); SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1); tcpServer.run(); break; // Successfully bound a port, break out. @@ -169,19 +212,7 @@ public class TestFrameDecoder { } } } - - XDR xdrOut = createGetportMount(); - int headerSize = xdrOut.size(); - int bufsize = 2 * 1024 * 1024; - byte[] buffer = new byte[bufsize]; - xdrOut.writeFixedOpaque(buffer); - int requestSize = xdrOut.size() - headerSize; - - // Send the request to the server - testRequest(xdrOut, serverPort); - - // Verify the server got the request with right size - assertEquals(requestSize, resultSize); + return serverPort; } static void createPortmapXDRheader(XDR xdr_out, int procedure) { Added: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties?rev=1595352&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties (added) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/resources/log4j.properties Fri May 16 21:25:05 2014 @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n