This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a06e35a1979a4be336c4d6be05cceb815637e245
Author: Chen Liang <cli...@apache.org>
AuthorDate: Tue Oct 23 14:53:45 2018 -0700

    HDFS-13566. Add configurable additional RPC listener to NameNode. 
Contributed by Chen Liang.
---
 .../main/java/org/apache/hadoop/ipc/Server.java    | 114 +++++++++++++++++++--
 .../hadoop/security/SaslPropertiesResolver.java    |   4 +-
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   |  53 +++++++++-
 .../java/org/apache/hadoop/hdfs/DFSUtilClient.java |  96 +++++++++++++++--
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   5 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   3 +
 .../hadoop/hdfs/server/namenode/NameNode.java      |  30 ++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  16 ++-
 .../src/main/resources/hdfs-default.xml            |  11 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  51 +++++++++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    | 112 ++++++++++++++++++++
 11 files changed, 472 insertions(+), 23 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c81fcf4..c23390f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -374,6 +374,24 @@ public abstract class Server {
   }
 
   /**
+   * Returns the SASL qop for the current call, if the current call is
+   * set, and the SASL negotiation is done. Otherwise return null. Note
+   * that CurCall is thread local object. So in fact, different handler
+   * threads will process different CurCall object.
+   *
+   * Also, only return for RPC calls, not supported for other protocols.
+   * @return the QOP of the current connection.
+   */
+  public static String getEstablishedQOP() {
+    Call call = CurCall.get();
+    if (call == null || !(call instanceof RpcCall)) {
+      return null;
+    }
+    RpcCall rpcCall = (RpcCall)call;
+    return rpcCall.connection.getEstablishedQOP();
+  }
+
+  /**
    * Returns the clientId from the current RPC request
    */
   public static byte[] getClientId() {
@@ -452,6 +470,10 @@ public abstract class Server {
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
   private Listener listener = null;
+  // Auxiliary listeners maintained as in a map, to allow
+  // arbitrary number of of auxiliary listeners. A map from
+  // the port to the listener binding to it.
+  private Map<Integer, Listener> auxiliaryListenerMap;
   private Responder responder = null;
   private Handler[] handlers = null;
 
@@ -1142,11 +1164,12 @@ public abstract class Server {
     private Reader[] readers = null;
     private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
+    private int listenPort; //the port we bind at
     private int backlogLength = conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
     
-    public Listener() throws IOException {
+    Listener(int port) throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
@@ -1154,7 +1177,10 @@ public abstract class Server {
 
       // Bind the server socket to the local host and port
       bind(acceptChannel.socket(), address, backlogLength, conf, 
portRangeConfig);
-      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral 
port
+      //Could be an ephemeral port
+      this.listenPort = acceptChannel.socket().getLocalPort();
+      Thread.currentThread().setName("Listener at " +
+          bindAddress + "/" + this.listenPort);
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
@@ -1337,7 +1363,7 @@ public abstract class Server {
         channel.socket().setKeepAlive(true);
         
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel);
+        Connection c = connectionManager.register(channel, this.listenPort);
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
@@ -1759,6 +1785,7 @@ public abstract class Server {
     private ByteBuffer unwrappedDataLengthBuffer;
     private int serviceClass;
     private boolean shouldClose = false;
+    private int ingressPort;
 
     UserGroupInformation user = null;
     public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1770,7 +1797,8 @@ public abstract class Server {
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
     
-    public Connection(SocketChannel channel, long lastContact) {
+    public Connection(SocketChannel channel, long lastContact,
+        int ingressPort) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -1779,6 +1807,7 @@ public abstract class Server {
       this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
       this.addr = socket.getInetAddress();
+      this.ingressPort = ingressPort;
       if (addr == null) {
         this.hostAddress = "*Unknown*";
       } else {
@@ -1813,9 +1842,24 @@ public abstract class Server {
       return hostAddress;
     }
 
+    public int getIngressPort() {
+      return ingressPort;
+    }
+
     public InetAddress getHostInetAddress() {
       return addr;
     }
+
+    public String getEstablishedQOP() {
+      // In practice, saslServer should not be null when this is
+      // called. If it is null, it must be either some
+      // configuration mistake or it is called from unit test.
+      if (saslServer == null) {
+        LOG.warn("SASL server should not be null!");
+        return null;
+      }
+      return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+    }
     
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
@@ -2223,7 +2267,7 @@ public abstract class Server {
     private SaslServer createSaslServer(AuthMethod authMethod)
         throws IOException, InterruptedException {
       final Map<String,?> saslProps =
-                  saslPropsResolver.getServerProperties(addr);
+                  saslPropsResolver.getServerProperties(addr, ingressPort);
       return new SaslRpcServer(authMethod).create(this, saslProps, 
secretManager);
     }
     
@@ -2739,7 +2783,8 @@ public abstract class Server {
   private class Handler extends Thread {
     public Handler(int instanceNumber) {
       this.setDaemon(true);
-      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+      this.setName("IPC Server handler "+ instanceNumber +
+          " on default port " + port);
     }
 
     @Override
@@ -2900,6 +2945,7 @@ public abstract class Server {
     this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
+    this.auxiliaryListenerMap = null;
     this.maxDataLength = 
conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
         CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     if (queueSizePerHandler != -1) {
@@ -2939,8 +2985,9 @@ public abstract class Server {
     this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
     
     // Start the listener here and let it bind to the port
-    listener = new Listener();
-    this.port = listener.getAddress().getPort();    
+    listener = new Listener(port);
+    // set the server port to the default listener port.
+    this.port = listener.getAddress().getPort();
     connectionManager = new ConnectionManager();
     this.rpcMetrics = RpcMetrics.create(this, conf);
     this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@@ -2962,7 +3009,23 @@ public abstract class Server {
     
     this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
   }
-  
+
+  public synchronized void addAuxiliaryListener(int auxiliaryPort)
+      throws IOException {
+    if (auxiliaryListenerMap == null) {
+      auxiliaryListenerMap = new HashMap<>();
+    }
+    if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) 
{
+      throw new IOException(
+          "There is already a listener binding to: " + auxiliaryPort);
+    }
+    Listener newListener = new Listener(auxiliaryPort);
+    // in the case of port = 0, the listener would be on a != 0 port.
+    LOG.info("Adding a server listener on port " +
+        newListener.getAddress().getPort());
+    auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
+  }
+
   private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
       throws IOException {
     RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@@ -3199,6 +3262,12 @@ public abstract class Server {
   public synchronized void start() {
     responder.start();
     listener.start();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.start();
+      }
+    }
+
     handlers = new Handler[handlerCount];
     
     for (int i = 0; i < handlerCount; i++) {
@@ -3220,6 +3289,12 @@ public abstract class Server {
     }
     listener.interrupt();
     listener.doStop();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.interrupt();
+        newListener.doStop();
+      }
+    }
     responder.interrupt();
     notifyAll();
     this.rpcMetrics.shutdown();
@@ -3243,6 +3318,23 @@ public abstract class Server {
   public synchronized InetSocketAddress getListenerAddress() {
     return listener.getAddress();
   }
+
+  /**
+   * Return the set of all the configured auxiliary socket addresses NameNode
+   * RPC is listening on. If there are none, or it is not configured at all, an
+   * empty set is returned.
+   * @return the set of all the auxiliary addresses on which the
+   *         RPC server is listening on.
+   */
+  public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
+    Set<InetSocketAddress> allAddrs = new HashSet<>();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener auxListener : auxiliaryListenerMap.values()) {
+        allAddrs.add(auxListener.getAddress());
+      }
+    }
+    return allAddrs;
+  }
   
   /** 
    * Called for each call. 
@@ -3547,11 +3639,11 @@ public abstract class Server {
       return connections.toArray(new Connection[0]);
     }
 
-    Connection register(SocketChannel channel) {
+    Connection register(SocketChannel channel, int ingressPort) {
       if (isFull()) {
         return null;
       }
-      Connection connection = new Connection(channel, Time.now());
+      Connection connection = new Connection(channel, Time.now(), ingressPort);
       add(connection);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server connection from " + connection +
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 64b86e3..dd6c42e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getServerProperties(InetAddress clientAddress,
       int ingressPort){
-    return properties;
+    return getServerProperties(clientAddress);
   }
 
   /**
@@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getClientProperties(InetAddress serverAddress,
       int ingressPort) {
-    return properties;
+    return getClientProperties(serverAddress);
   }
 
   /**
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 95e76f7..9e42690 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -49,8 +49,10 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -171,6 +173,11 @@ public class TestIPC {
       this(handlerCount, sleep, LongWritable.class, null);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep)
+        throws IOException {
+      this(port, handlerCount, sleep, LongWritable.class, null);
+    }
+
     public TestServer(int handlerCount, boolean sleep, Configuration conf)
         throws IOException {
       this(handlerCount, sleep, LongWritable.class, null, conf);
@@ -182,11 +189,24 @@ public class TestIPC {
       this(handlerCount, sleep, paramClass, responseClass, conf);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass) throws IOException {
+      this(port, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
     public TestServer(int handlerCount, boolean sleep,
         Class<? extends Writable> paramClass,
         Class<? extends Writable> responseClass, Configuration conf)
         throws IOException {
-      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this(0, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass, Configuration conf)
+        throws IOException {
+      super(ADDRESS, port, paramClass, handlerCount, conf);
       this.sleep = sleep;
       this.responseClass = responseClass;
     }
@@ -338,6 +358,37 @@ public class TestIPC {
     }
     server.stop();
   }
+
+  @Test
+  public void testAuxiliaryPorts() throws IOException, InterruptedException {
+    int defaultPort = 9000;
+    int[] auxiliaryPorts = {9001, 9002, 9003};
+    final int handlerCount = 5;
+    final boolean handlerSleep = false;
+    Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
+    for (int port : auxiliaryPorts) {
+      server.addAuxiliaryListener(port);
+    }
+    Set<InetSocketAddress> listenerAddrs =
+        server.getAuxiliaryListenerAddresses();
+    Set<InetSocketAddress> addrs = new HashSet<>();
+    for (InetSocketAddress addr : listenerAddrs) {
+      addrs.add(NetUtils.getConnectAddress(addr));
+    }
+    server.start();
+
+    Client client = new Client(LongWritable.class, conf);
+    Set<SerialCaller> calls = new HashSet<>();
+    for (InetSocketAddress addr : addrs) {
+      calls.add(new SerialCaller(client, addr, 100));
+    }
+    for (SerialCaller caller : calls) {
+      caller.join();
+      assertFalse(caller.failed);
+    }
+    client.stop();
+    server.stop();
+  }
        
   @Test(timeout=60000)
   public void testStandAloneClient() throws IOException {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 15c9b84..4be4c82 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -31,6 +28,7 @@ import java.net.URISyntaxException;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -41,6 +39,9 @@ import java.util.Map;
 
 import javax.net.SocketFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.SignedBytes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -72,9 +73,8 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.SignedBytes;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.*;
+
 
 public class DFSUtilClient {
   public static final byte[] EMPTY_BYTES = {};
@@ -162,7 +162,7 @@ public class DFSUtilClient {
   public static Map<String, Map<String, InetSocketAddress>> 
getHaNnRpcAddresses(
       Configuration conf) {
     return DFSUtilClient.getAddresses(conf, null,
-      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+      DFS_NAMENODE_RPC_ADDRESS_KEY);
   }
 
   /**
@@ -365,7 +365,7 @@ public class DFSUtilClient {
     Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
     for (String nnId : emptyAsSingletonNull(nnIds)) {
       String suffix = concatSuffixes(nsId, nnId);
-      String address = getConfValue(defaultValue, suffix, conf, keys);
+      String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
       if (address != null) {
         InetSocketAddress isa = NetUtils.createSocketAddr(address);
         if (isa.isUnresolved()) {
@@ -380,6 +380,86 @@ public class DFSUtilClient {
   }
 
   /**
+   * Return address from configuration. Take a list of keys as preference.
+   * If the address to be returned is the value of 
DFS_NAMENODE_RPC_ADDRESS_KEY,
+   * will check to see if auxiliary ports are enabled. If so, call to replace
+   * address port with auxiliary port. If the address is not the value of
+   * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
+   * find any address, return the given default value.
+   *
+   * @param defaultValue the default value if no values found for given keys
+   * @param suffix suffix to append to keys
+   * @param conf the configuration
+   * @param keys a list of keys, ordered by preference
+   * @return
+   */
+  private static String checkKeysAndProcess(String defaultValue, String suffix,
+      Configuration conf, String... keys) {
+    String succeededKey = null;
+    String address = null;
+    for (String key : keys) {
+      address = getConfValue(null, suffix, conf, key);
+      if (address != null) {
+        succeededKey = key;
+        break;
+      }
+    }
+    String ret;
+    if (address == null) {
+      ret = defaultValue;
+    } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey))  {
+      ret = checkRpcAuxiliary(conf, suffix, address);
+    } else {
+      ret = address;
+    }
+    return ret;
+  }
+
+  /**
+   * Check if auxiliary port is enabled, if yes, check if the given address
+   * should have its port replaced by an auxiliary port. If the given address
+   * does not contain a port, append the auxiliary port to enforce using it.
+   *
+   * @param conf configuration.
+   * @param address the address to check and modify (if needed).
+   * @return the new modified address containing auxiliary port, or original
+   * address if auxiliary port not enabled.
+   */
+  private static String checkRpcAuxiliary(Configuration conf, String suffix,
+      String address) {
+    String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+    key = addSuffix(key, suffix);
+    int[] ports = conf.getInts(key);
+    if (ports == null || ports.length == 0) {
+      return address;
+    }
+    LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
+    URI uri;
+    try {
+      uri = new URI(address);
+    } catch (URISyntaxException e) {
+      // return the original address untouched if it is not a valid URI. This
+      // happens in unit test, as MiniDFSCluster sets the value to
+      // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
+      // should not be the case. So log a warning message here.
+      LOG.warn("NameNode address is not a valid uri:" + address);
+      return address;
+    }
+    // Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
+    // localhost), then append port
+    // TODO : revisit if there is a better way
+    StringBuilder sb = new StringBuilder();
+    sb.append(uri.getScheme());
+    sb.append("://");
+    sb.append(uri.getHost());
+    sb.append(":");
+    // TODO : currently, only the very first auxiliary port is being used.
+    // But actually NN supports running multiple auxiliary
+    sb.append(ports[0]);
+    return sb.toString();
+  }
+
+  /**
    * Given a list of keys in the order of preference, returns a value
    * for the key in the given order from the configuration.
    * @param defaultValue default value to return, when key was not found
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 4bc9b3e..c74ece2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -69,6 +69,11 @@ public interface HdfsClientConfigKeys {
   String  DFS_NAMESERVICES = "dfs.nameservices";
   String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
   int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
+
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+      + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
+
   String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
   int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
   String  DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 41cc8e6..e973bd3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1054,6 +1054,9 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
       DFSNetworkTopology.class;
 
+  public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
+      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index efff782..b44d104 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1062,6 +1063,14 @@ public class NameNode extends ReconfigurableBase 
implements
   }
 
   /**
+   * @return The auxiliary nameNode RPC addresses, or empty set if there
+   * is none.
+   */
+  public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
+    return rpcServer.getAuxiliaryRpcAddresses();
+  }
+
+  /**
    * @return NameNode RPC address in "host:port" string form
    */
   public String getNameNodeAddressHostPortString() {
@@ -1069,6 +1078,27 @@ public class NameNode extends ReconfigurableBase 
implements
   }
 
   /**
+   * Return a host:port format string corresponds to an auxiliary
+   * port configured on NameNode. If there are multiple auxiliary ports,
+   * an arbitrary one is returned. If there is no auxiliary listener, returns
+   * null.
+   *
+   * @return a string of format host:port that points to an auxiliary NameNode
+   *         address, or null if there is no such address.
+   */
+  @VisibleForTesting
+  public String getNNAuxiliaryRpcAddress() {
+    Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
+    if (auxiliaryAddrs.isEmpty()) {
+      return null;
+    }
+    // since set has no particular order, returning the first element of
+    // from the iterator is effectively arbitrary.
+    InetSocketAddress addr = auxiliaryAddrs.iterator().next();
+    return NetUtils.getHostPortString(addr);
+  }
+
+  /**
    * @return NameNode service RPC address if configured, the
    *    NameNode RPC address otherwise
    */
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0ef1343..aea811d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -26,6 +26,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 
@@ -519,6 +520,13 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     if (lifelineRpcServer != null) {
       lifelineRpcServer.setTracer(nn.tracer);
     }
+    int[] auxiliaryPorts =
+        conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
+    if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
+      for (int auxiliaryPort : auxiliaryPorts) {
+        this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
+      }
+    }
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -588,10 +596,16 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     return serviceRPCAddress;
   }
 
-  InetSocketAddress getRpcAddress() {
+  @VisibleForTesting
+  public InetSocketAddress getRpcAddress() {
     return clientRpcAddress;
   }
 
+  @VisibleForTesting
+  public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
+    return clientRpcServer.getAuxiliaryListenerAddresses();
+  }
+
   private static UserGroupInformation getRemoteUser() throws IOException {
     return NameNode.getRemoteUser();
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index aa97ec5..0a3b2d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4545,4 +4545,15 @@
       ensure that other waiters on the lock can get in.
     </description>
   </property>
+
+  <property>
+    <name>dfs.namenode.rpc-address.auxiliary-ports</name>
+    <value></value>
+    <description>
+      A comma separated list of auxiliary ports for the NameNode to listen on.
+      This allows exposing multiple NN addresses to clients.
+      Particularly, it is used to enforce different SASL levels on different 
ports.
+      Empty list indicates that auxiliary ports are disabled.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 2ca8abc..ca28874 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1359,6 +1359,21 @@ public class MiniDFSCluster implements AutoCloseable {
     }
     return uri;
   }
+
+  URI getURIForAuxiliaryPort(int nnIndex) {
+    String hostPort =
+        getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
+    if (hostPort == null) {
+      throw new RuntimeException("No auxiliary port found");
+    }
+    URI uri = null;
+    try {
+      uri = new URI("hdfs://" + hostPort);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException", e);
+    }
+    return uri;
+  }
   
   public int getInstanceId() {
     return instanceId;
@@ -1913,6 +1928,14 @@ public class MiniDFSCluster implements AutoCloseable {
     checkSingleNameNode();
     return getNameNodePort(0);
   }
+
+  /**
+   * Get the auxiliary port of NameNode, NameNode specified by index.
+   */
+  public int getNameNodeAuxiliaryPort() {
+    checkSingleNameNode();
+    return getNameNodeAuxiliaryPort(0);
+  }
     
   /**
    * Gets the rpc port used by the NameNode at the given index, because the
@@ -1923,6 +1946,22 @@ public class MiniDFSCluster implements AutoCloseable {
   }
 
   /**
+   * Gets the rpc port used by the NameNode at the given index, if the
+   * NameNode has multiple auxiliary ports configured, a arbitrary
+   * one is returned.
+   */
+  public int getNameNodeAuxiliaryPort(int nnIndex) {
+    Set<InetSocketAddress> allAuxiliaryAddresses =
+        getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
+    if (allAuxiliaryAddresses.isEmpty()) {
+      return -1;
+    } else {
+      InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
+      return addr.getPort();
+    }
+  }
+
+  /**
    * @return the service rpc port used by the NameNode at the given index.
    */     
   public int getNameNodeServicePort(int nnIndex) {
@@ -2502,6 +2541,12 @@ public class MiniDFSCluster implements AutoCloseable {
     return getFileSystem(0);
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort()
+      throws IOException {
+    checkSingleNameNode();
+    return getFileSystemFromAuxiliaryPort(0);
+  }
+
   /**
    * Get a client handle to the DFS cluster for the namenode at given index.
    */
@@ -2510,6 +2555,12 @@ public class MiniDFSCluster implements AutoCloseable {
         getNN(nnIndex).conf));
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
+      throws IOException {
+    return (DistributedFileSystem) addFileSystem(FileSystem.get(
+        getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
+  }
+
   /**
    * Get another FileSystem instance that is different from 
FileSystem.get(conf).
    * This simulating different threads working on different FileSystem 
instances.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
new file mode 100644
index 0000000..867fbac
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test NN auxiliary port with HA.
+ */
+public class TestHAAuxiliaryPort {
+  @Test
+  public void testTest() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
+        "9000,9001");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
+        "9000,9001");
+    conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
+    conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+        .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
+            .addNN(new MiniDFSNNTopology.NNConf("nn1"))
+            .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
+    cluster.transitionToActive(0);
+    cluster.waitActive();
+
+    NameNode nn0 = cluster.getNameNode(0);
+    NameNode nn1 = cluster.getNameNode(1);
+
+    // all the addresses below are valid nn0 addresses
+    NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
+    InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer0 =
+        rpcServer0.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer0.size());
+
+    // all the addresses below are valid nn1 addresses
+    NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
+    InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer1 =
+        rpcServer1.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer1.size());
+
+    // mkdir on nn0 uri 0
+    URI nn0URI = new URI("hdfs://localhost:" +
+        server0RpcAddress.getPort());
+    try (DFSClient client0 = new DFSClient(nn0URI, conf)){
+      client0.mkdirs("/test", null, true);
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer0) {
+        nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
+          assertTrue(clientTmp.exists("/test"));
+        }
+      }
+    }
+
+    // now perform a failover
+    cluster.shutdownNameNode(0);
+    cluster.transitionToActive(1);
+
+    // then try to read the file from the nn1
+    URI nn1URI = new URI("hdfs://localhost:" +
+        server1RpcAddress.getPort());
+    try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
+      assertTrue(client1.exists("/test"));
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer1) {
+        nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
+          assertTrue(client1.exists("/test"));
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to