Author: atm Date: Fri Apr 25 00:19:34 2014 New Revision: 1589915 URL: http://svn.apache.org/r1589915 Log: HDFS-6281. Provide option to use the NFS Gateway without having to use the Hadoop portmapper. Contributed by Aaron T. Myers.
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1589915&r1=1589914&r2=1589915&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Fri Apr 25 00:19:34 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.oncrpc; import java.io.IOException; +import java.net.DatagramSocket; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +47,12 @@ public abstract class RpcProgram extends private final int highProgVersion; /** + * If not null, this will be used as the socket to use to connect to the + * system portmap daemon when registering this RPC server program. + */ + private final DatagramSocket registrationSocket; + + /** * Constructor * * @param program program name @@ -56,13 +63,15 @@ public abstract class RpcProgram extends * @param highProgVersion highest version of the specification supported */ protected RpcProgram(String program, String host, int port, int progNumber, - int lowProgVersion, int highProgVersion) { + int lowProgVersion, int highProgVersion, + DatagramSocket registrationSocket) { this.program = program; this.host = host; this.port = port; this.progNumber = progNumber; this.lowProgVersion = lowProgVersion; this.highProgVersion = highProgVersion; + this.registrationSocket = registrationSocket; } /** @@ -105,14 +114,14 @@ public abstract class RpcProgram extends protected void register(PortmapMapping mapEntry, boolean set) { XDR mappingRequest = PortmapRequest.create(mapEntry, set); SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT, - mappingRequest); + mappingRequest, registrationSocket); try { registrationClient.run(); } catch (IOException e) { String request = set ? "Registration" : "Unregistration"; LOG.error(request + " failure with " + host + ":" + port - + ", portmap entry: " + mapEntry); - throw new RuntimeException(request + " failure"); + + ", portmap entry: " + mapEntry, e); + throw new RuntimeException(request + " failure", e); } } Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1589915&r1=1589914&r2=1589915&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Fri Apr 25 00:19:34 2014 @@ -27,43 +27,56 @@ import java.util.Arrays; * A simple UDP based RPC client which just sends one request to a server. */ public class SimpleUdpClient { + protected final String host; protected final int port; protected final XDR request; protected final boolean oneShot; + protected final DatagramSocket clientSocket; - public SimpleUdpClient(String host, int port, XDR request) { - this(host, port, request, true); + public SimpleUdpClient(String host, int port, XDR request, + DatagramSocket clientSocket) { + this(host, port, request, true, clientSocket); } - public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot) { + public SimpleUdpClient(String host, int port, XDR request, Boolean oneShot, + DatagramSocket clientSocket) { this.host = host; this.port = port; this.request = request; this.oneShot = oneShot; + this.clientSocket = clientSocket; } public void run() throws IOException { - DatagramSocket clientSocket = new DatagramSocket(); InetAddress IPAddress = InetAddress.getByName(host); byte[] sendData = request.getBytes(); byte[] receiveData = new byte[65535]; - - DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, - IPAddress, port); - clientSocket.send(sendPacket); - DatagramPacket receivePacket = new DatagramPacket(receiveData, - receiveData.length); - clientSocket.receive(receivePacket); - - // Check reply status - XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0, - receivePacket.getLength())); - RpcReply reply = RpcReply.read(xdr); - if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) { - throw new IOException("Request failed: " + reply.getState()); + // Use the provided socket if there is one, else just make a new one. + DatagramSocket socket = this.clientSocket == null ? + new DatagramSocket() : this.clientSocket; + + try { + DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, + IPAddress, port); + socket.send(sendPacket); + DatagramPacket receivePacket = new DatagramPacket(receiveData, + receiveData.length); + socket.receive(receivePacket); + + // Check reply status + XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0, + receivePacket.getLength())); + RpcReply reply = RpcReply.read(xdr); + if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) { + throw new IOException("Request failed: " + reply.getState()); + } + } finally { + // If the client socket was passed in to this UDP client, it's on the + // caller of this UDP client to close that socket. + if (this.clientSocket == null) { + socket.close(); + } } - - clientSocket.close(); } } Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1589915&r1=1589914&r2=1589915&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Fri Apr 25 00:19:34 2014 @@ -51,7 +51,8 @@ public class TestFrameDecoder { protected TestRpcProgram(String program, String host, int port, int progNumber, int lowProgVersion, int highProgVersion) { - super(program, host, port, progNumber, lowProgVersion, highProgVersion); + super(program, host, port, progNumber, lowProgVersion, highProgVersion, + null); } @Override