This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit a06e35a1979a4be336c4d6be05cceb815637e245 Author: Chen Liang <cli...@apache.org> AuthorDate: Tue Oct 23 14:53:45 2018 -0700 HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang. --- .../main/java/org/apache/hadoop/ipc/Server.java | 114 +++++++++++++++++++-- .../hadoop/security/SaslPropertiesResolver.java | 4 +- .../test/java/org/apache/hadoop/ipc/TestIPC.java | 53 +++++++++- .../java/org/apache/hadoop/hdfs/DFSUtilClient.java | 96 +++++++++++++++-- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 5 + .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../hadoop/hdfs/server/namenode/NameNode.java | 30 ++++++ .../hdfs/server/namenode/NameNodeRpcServer.java | 16 ++- .../src/main/resources/hdfs-default.xml | 11 ++ .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 51 +++++++++ .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java | 112 ++++++++++++++++++++ 11 files changed, 472 insertions(+), 23 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index c81fcf4..c23390f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -374,6 +374,24 @@ public abstract class Server { } /** + * Returns the SASL qop for the current call, if the current call is + * set, and the SASL negotiation is done. Otherwise return null. Note + * that CurCall is thread local object. So in fact, different handler + * threads will process different CurCall object. + * + * Also, only return for RPC calls, not supported for other protocols. + * @return the QOP of the current connection. + */ + public static String getEstablishedQOP() { + Call call = CurCall.get(); + if (call == null || !(call instanceof RpcCall)) { + return null; + } + RpcCall rpcCall = (RpcCall)call; + return rpcCall.connection.getEstablishedQOP(); + } + + /** * Returns the clientId from the current RPC request */ public static byte[] getClientId() { @@ -452,6 +470,10 @@ public abstract class Server { // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; + // Auxiliary listeners maintained as in a map, to allow + // arbitrary number of of auxiliary listeners. A map from + // the port to the listener binding to it. + private Map<Integer, Listener> auxiliaryListenerMap; private Responder responder = null; private Handler[] handlers = null; @@ -1142,11 +1164,12 @@ public abstract class Server { private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at + private int listenPort; //the port we bind at private int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); - public Listener() throws IOException { + Listener(int port) throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); @@ -1154,7 +1177,10 @@ public abstract class Server { // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); - port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port + //Could be an ephemeral port + this.listenPort = acceptChannel.socket().getLocalPort(); + Thread.currentThread().setName("Listener at " + + bindAddress + "/" + this.listenPort); // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; @@ -1337,7 +1363,7 @@ public abstract class Server { channel.socket().setKeepAlive(true); Reader reader = getReader(); - Connection c = connectionManager.register(channel); + Connection c = connectionManager.register(channel, this.listenPort); // If the connectionManager can't take it, close the connection. if (c == null) { if (channel.isOpen()) { @@ -1759,6 +1785,7 @@ public abstract class Server { private ByteBuffer unwrappedDataLengthBuffer; private int serviceClass; private boolean shouldClose = false; + private int ingressPort; UserGroupInformation user = null; public UserGroupInformation attemptingUser = null; // user name before auth @@ -1770,7 +1797,8 @@ public abstract class Server { private boolean sentNegotiate = false; private boolean useWrap = false; - public Connection(SocketChannel channel, long lastContact) { + public Connection(SocketChannel channel, long lastContact, + int ingressPort) { this.channel = channel; this.lastContact = lastContact; this.data = null; @@ -1779,6 +1807,7 @@ public abstract class Server { this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); this.addr = socket.getInetAddress(); + this.ingressPort = ingressPort; if (addr == null) { this.hostAddress = "*Unknown*"; } else { @@ -1813,9 +1842,24 @@ public abstract class Server { return hostAddress; } + public int getIngressPort() { + return ingressPort; + } + public InetAddress getHostInetAddress() { return addr; } + + public String getEstablishedQOP() { + // In practice, saslServer should not be null when this is + // called. If it is null, it must be either some + // configuration mistake or it is called from unit test. + if (saslServer == null) { + LOG.warn("SASL server should not be null!"); + return null; + } + return (String)saslServer.getNegotiatedProperty(Sasl.QOP); + } public void setLastContact(long lastContact) { this.lastContact = lastContact; @@ -2223,7 +2267,7 @@ public abstract class Server { private SaslServer createSaslServer(AuthMethod authMethod) throws IOException, InterruptedException { final Map<String,?> saslProps = - saslPropsResolver.getServerProperties(addr); + saslPropsResolver.getServerProperties(addr, ingressPort); return new SaslRpcServer(authMethod).create(this, saslProps, secretManager); } @@ -2739,7 +2783,8 @@ public abstract class Server { private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); - this.setName("IPC Server handler "+ instanceNumber + " on " + port); + this.setName("IPC Server handler "+ instanceNumber + + " on default port " + port); } @Override @@ -2900,6 +2945,7 @@ public abstract class Server { this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; + this.auxiliaryListenerMap = null; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { @@ -2939,8 +2985,9 @@ public abstract class Server { this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); // Start the listener here and let it bind to the port - listener = new Listener(); - this.port = listener.getAddress().getPort(); + listener = new Listener(port); + // set the server port to the default listener port. + this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); @@ -2962,7 +3009,23 @@ public abstract class Server { this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); } - + + public synchronized void addAuxiliaryListener(int auxiliaryPort) + throws IOException { + if (auxiliaryListenerMap == null) { + auxiliaryListenerMap = new HashMap<>(); + } + if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) { + throw new IOException( + "There is already a listener binding to: " + auxiliaryPort); + } + Listener newListener = new Listener(auxiliaryPort); + // in the case of port = 0, the listener would be on a != 0 port. + LOG.info("Adding a server listener on port " + + newListener.getAddress().getPort()); + auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener); + } + private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods) throws IOException { RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder(); @@ -3199,6 +3262,12 @@ public abstract class Server { public synchronized void start() { responder.start(); listener.start(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener newListener : auxiliaryListenerMap.values()) { + newListener.start(); + } + } + handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { @@ -3220,6 +3289,12 @@ public abstract class Server { } listener.interrupt(); listener.doStop(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener newListener : auxiliaryListenerMap.values()) { + newListener.interrupt(); + newListener.doStop(); + } + } responder.interrupt(); notifyAll(); this.rpcMetrics.shutdown(); @@ -3243,6 +3318,23 @@ public abstract class Server { public synchronized InetSocketAddress getListenerAddress() { return listener.getAddress(); } + + /** + * Return the set of all the configured auxiliary socket addresses NameNode + * RPC is listening on. If there are none, or it is not configured at all, an + * empty set is returned. + * @return the set of all the auxiliary addresses on which the + * RPC server is listening on. + */ + public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() { + Set<InetSocketAddress> allAddrs = new HashSet<>(); + if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) { + for (Listener auxListener : auxiliaryListenerMap.values()) { + allAddrs.add(auxListener.getAddress()); + } + } + return allAddrs; + } /** * Called for each call. @@ -3547,11 +3639,11 @@ public abstract class Server { return connections.toArray(new Connection[0]); } - Connection register(SocketChannel channel) { + Connection register(SocketChannel channel, int ingressPort) { if (isFull()) { return null; } - Connection connection = new Connection(channel, Time.now()); + Connection connection = new Connection(channel, Time.now(), ingressPort); add(connection); if (LOG.isDebugEnabled()) { LOG.debug("Server connection from " + connection + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java index 64b86e3..dd6c42e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java @@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{ */ public Map<String, String> getServerProperties(InetAddress clientAddress, int ingressPort){ - return properties; + return getServerProperties(clientAddress); } /** @@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{ */ public Map<String, String> getClientProperties(InetAddress serverAddress, int ingressPort) { - return properties; + return getClientProperties(serverAddress); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 95e76f7..9e42690 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -49,8 +49,10 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -171,6 +173,11 @@ public class TestIPC { this(handlerCount, sleep, LongWritable.class, null); } + public TestServer(int port, int handlerCount, boolean sleep) + throws IOException { + this(port, handlerCount, sleep, LongWritable.class, null); + } + public TestServer(int handlerCount, boolean sleep, Configuration conf) throws IOException { this(handlerCount, sleep, LongWritable.class, null, conf); @@ -182,11 +189,24 @@ public class TestIPC { this(handlerCount, sleep, paramClass, responseClass, conf); } + public TestServer(int port, int handlerCount, boolean sleep, + Class<? extends Writable> paramClass, + Class<? extends Writable> responseClass) throws IOException { + this(port, handlerCount, sleep, paramClass, responseClass, conf); + } + public TestServer(int handlerCount, boolean sleep, Class<? extends Writable> paramClass, Class<? extends Writable> responseClass, Configuration conf) throws IOException { - super(ADDRESS, 0, paramClass, handlerCount, conf); + this(0, handlerCount, sleep, paramClass, responseClass, conf); + } + + public TestServer(int port, int handlerCount, boolean sleep, + Class<? extends Writable> paramClass, + Class<? extends Writable> responseClass, Configuration conf) + throws IOException { + super(ADDRESS, port, paramClass, handlerCount, conf); this.sleep = sleep; this.responseClass = responseClass; } @@ -338,6 +358,37 @@ public class TestIPC { } server.stop(); } + + @Test + public void testAuxiliaryPorts() throws IOException, InterruptedException { + int defaultPort = 9000; + int[] auxiliaryPorts = {9001, 9002, 9003}; + final int handlerCount = 5; + final boolean handlerSleep = false; + Server server = new TestServer(defaultPort, handlerCount, handlerSleep); + for (int port : auxiliaryPorts) { + server.addAuxiliaryListener(port); + } + Set<InetSocketAddress> listenerAddrs = + server.getAuxiliaryListenerAddresses(); + Set<InetSocketAddress> addrs = new HashSet<>(); + for (InetSocketAddress addr : listenerAddrs) { + addrs.add(NetUtils.getConnectAddress(addr)); + } + server.start(); + + Client client = new Client(LongWritable.class, conf); + Set<SerialCaller> calls = new HashSet<>(); + for (InetSocketAddress addr : addrs) { + calls.add(new SerialCaller(client, addr, 100)); + } + for (SerialCaller caller : calls) { + caller.join(); + assertFalse(caller.failed); + } + client.stop(); + server.stop(); + } @Test(timeout=60000) public void testStandAloneClient() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 15c9b84..4be4c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; - import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; @@ -31,6 +28,7 @@ import java.net.URISyntaxException; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -41,6 +39,9 @@ import java.util.Map; import javax.net.SocketFactory; +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.google.common.primitives.SignedBytes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -72,9 +73,8 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; -import com.google.common.primitives.SignedBytes; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.*; + public class DFSUtilClient { public static final byte[] EMPTY_BYTES = {}; @@ -162,7 +162,7 @@ public class DFSUtilClient { public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses( Configuration conf) { return DFSUtilClient.getAddresses(conf, null, - HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + DFS_NAMENODE_RPC_ADDRESS_KEY); } /** @@ -365,7 +365,7 @@ public class DFSUtilClient { Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { String suffix = concatSuffixes(nsId, nnId); - String address = getConfValue(defaultValue, suffix, conf, keys); + String address = checkKeysAndProcess(defaultValue, suffix, conf, keys); if (address != null) { InetSocketAddress isa = NetUtils.createSocketAddr(address); if (isa.isUnresolved()) { @@ -380,6 +380,86 @@ public class DFSUtilClient { } /** + * Return address from configuration. Take a list of keys as preference. + * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY, + * will check to see if auxiliary ports are enabled. If so, call to replace + * address port with auxiliary port. If the address is not the value of + * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to + * find any address, return the given default value. + * + * @param defaultValue the default value if no values found for given keys + * @param suffix suffix to append to keys + * @param conf the configuration + * @param keys a list of keys, ordered by preference + * @return + */ + private static String checkKeysAndProcess(String defaultValue, String suffix, + Configuration conf, String... keys) { + String succeededKey = null; + String address = null; + for (String key : keys) { + address = getConfValue(null, suffix, conf, key); + if (address != null) { + succeededKey = key; + break; + } + } + String ret; + if (address == null) { + ret = defaultValue; + } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey)) { + ret = checkRpcAuxiliary(conf, suffix, address); + } else { + ret = address; + } + return ret; + } + + /** + * Check if auxiliary port is enabled, if yes, check if the given address + * should have its port replaced by an auxiliary port. If the given address + * does not contain a port, append the auxiliary port to enforce using it. + * + * @param conf configuration. + * @param address the address to check and modify (if needed). + * @return the new modified address containing auxiliary port, or original + * address if auxiliary port not enabled. + */ + private static String checkRpcAuxiliary(Configuration conf, String suffix, + String address) { + String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; + key = addSuffix(key, suffix); + int[] ports = conf.getInts(key); + if (ports == null || ports.length == 0) { + return address; + } + LOG.info("Using server auxiliary ports " + Arrays.toString(ports)); + URI uri; + try { + uri = new URI(address); + } catch (URISyntaxException e) { + // return the original address untouched if it is not a valid URI. This + // happens in unit test, as MiniDFSCluster sets the value to + // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this + // should not be the case. So log a warning message here. + LOG.warn("NameNode address is not a valid uri:" + address); + return address; + } + // Ignore the port, only take the schema(e.g. hdfs) and host (e.g. + // localhost), then append port + // TODO : revisit if there is a better way + StringBuilder sb = new StringBuilder(); + sb.append(uri.getScheme()); + sb.append("://"); + sb.append(uri.getHost()); + sb.append(":"); + // TODO : currently, only the very first auxiliary port is being used. + // But actually NN supports running multiple auxiliary + sb.append(ports[0]); + return sb.toString(); + } + + /** * Given a list of keys in the order of preference, returns a value * for the key in the given order from the configuration. * @param defaultValue default value to return, when key was not found diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 4bc9b3e..c74ece2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -69,6 +69,11 @@ public interface HdfsClientConfigKeys { String DFS_NAMESERVICES = "dfs.nameservices"; String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; + + String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports"; + String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY + + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX; + String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470; String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 41cc8e6..e973bd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1054,6 +1054,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT = DFSNetworkTopology.class; + public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index efff782..b44d104 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -1062,6 +1063,14 @@ public class NameNode extends ReconfigurableBase implements } /** + * @return The auxiliary nameNode RPC addresses, or empty set if there + * is none. + */ + public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() { + return rpcServer.getAuxiliaryRpcAddresses(); + } + + /** * @return NameNode RPC address in "host:port" string form */ public String getNameNodeAddressHostPortString() { @@ -1069,6 +1078,27 @@ public class NameNode extends ReconfigurableBase implements } /** + * Return a host:port format string corresponds to an auxiliary + * port configured on NameNode. If there are multiple auxiliary ports, + * an arbitrary one is returned. If there is no auxiliary listener, returns + * null. + * + * @return a string of format host:port that points to an auxiliary NameNode + * address, or null if there is no such address. + */ + @VisibleForTesting + public String getNNAuxiliaryRpcAddress() { + Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses(); + if (auxiliaryAddrs.isEmpty()) { + return null; + } + // since set has no particular order, returning the first element of + // from the iterator is effectively arbitrary. + InetSocketAddress addr = auxiliaryAddrs.iterator().next(); + return NetUtils.getHostPortString(addr); + } + + /** * @return NameNode service RPC address if configured, the * NameNode RPC address otherwise */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 0ef1343..aea811d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH; @@ -519,6 +520,13 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (lifelineRpcServer != null) { lifelineRpcServer.setTracer(nn.tracer); } + int[] auxiliaryPorts = + conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY); + if (auxiliaryPorts != null && auxiliaryPorts.length != 0) { + for (int auxiliaryPort : auxiliaryPorts) { + this.clientRpcServer.addAuxiliaryListener(auxiliaryPort); + } + } } /** Allow access to the lifeline RPC server for testing */ @@ -588,10 +596,16 @@ public class NameNodeRpcServer implements NamenodeProtocols { return serviceRPCAddress; } - InetSocketAddress getRpcAddress() { + @VisibleForTesting + public InetSocketAddress getRpcAddress() { return clientRpcAddress; } + @VisibleForTesting + public Set<InetSocketAddress> getAuxiliaryRpcAddresses() { + return clientRpcServer.getAuxiliaryListenerAddresses(); + } + private static UserGroupInformation getRemoteUser() throws IOException { return NameNode.getRemoteUser(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index aa97ec5..0a3b2d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4545,4 +4545,15 @@ ensure that other waiters on the lock can get in. </description> </property> + + <property> + <name>dfs.namenode.rpc-address.auxiliary-ports</name> + <value></value> + <description> + A comma separated list of auxiliary ports for the NameNode to listen on. + This allows exposing multiple NN addresses to clients. + Particularly, it is used to enforce different SASL levels on different ports. + Empty list indicates that auxiliary ports are disabled. + </description> + </property> </configuration> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 2ca8abc..ca28874 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1359,6 +1359,21 @@ public class MiniDFSCluster implements AutoCloseable { } return uri; } + + URI getURIForAuxiliaryPort(int nnIndex) { + String hostPort = + getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress(); + if (hostPort == null) { + throw new RuntimeException("No auxiliary port found"); + } + URI uri = null; + try { + uri = new URI("hdfs://" + hostPort); + } catch (URISyntaxException e) { + NameNode.LOG.warn("unexpected URISyntaxException", e); + } + return uri; + } public int getInstanceId() { return instanceId; @@ -1913,6 +1928,14 @@ public class MiniDFSCluster implements AutoCloseable { checkSingleNameNode(); return getNameNodePort(0); } + + /** + * Get the auxiliary port of NameNode, NameNode specified by index. + */ + public int getNameNodeAuxiliaryPort() { + checkSingleNameNode(); + return getNameNodeAuxiliaryPort(0); + } /** * Gets the rpc port used by the NameNode at the given index, because the @@ -1923,6 +1946,22 @@ public class MiniDFSCluster implements AutoCloseable { } /** + * Gets the rpc port used by the NameNode at the given index, if the + * NameNode has multiple auxiliary ports configured, a arbitrary + * one is returned. + */ + public int getNameNodeAuxiliaryPort(int nnIndex) { + Set<InetSocketAddress> allAuxiliaryAddresses = + getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses(); + if (allAuxiliaryAddresses.isEmpty()) { + return -1; + } else { + InetSocketAddress addr = allAuxiliaryAddresses.iterator().next(); + return addr.getPort(); + } + } + + /** * @return the service rpc port used by the NameNode at the given index. */ public int getNameNodeServicePort(int nnIndex) { @@ -2502,6 +2541,12 @@ public class MiniDFSCluster implements AutoCloseable { return getFileSystem(0); } + public DistributedFileSystem getFileSystemFromAuxiliaryPort() + throws IOException { + checkSingleNameNode(); + return getFileSystemFromAuxiliaryPort(0); + } + /** * Get a client handle to the DFS cluster for the namenode at given index. */ @@ -2510,6 +2555,12 @@ public class MiniDFSCluster implements AutoCloseable { getNN(nnIndex).conf)); } + public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex) + throws IOException { + return (DistributedFileSystem) addFileSystem(FileSystem.get( + getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf)); + } + /** * Get another FileSystem instance that is different from FileSystem.get(conf). * This simulating different threads working on different FileSystem instances. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java new file mode 100644 index 0000000..867fbac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +/** + * Test NN auxiliary port with HA. + */ +public class TestHAAuxiliaryPort { + @Test + public void testTest() throws Exception { + Configuration conf = new Configuration(); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1", + "9000,9001"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2", + "9000,9001"); + conf.set(DFS_NAMESERVICES, "ha-nn-uri-0"); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2"); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0") + .addNN(new MiniDFSNNTopology.NNConf("nn1")) + .addNN(new MiniDFSNNTopology.NNConf("nn2"))); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + cluster.transitionToActive(0); + cluster.waitActive(); + + NameNode nn0 = cluster.getNameNode(0); + NameNode nn1 = cluster.getNameNode(1); + + // all the addresses below are valid nn0 addresses + NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer(); + InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress(); + Set<InetSocketAddress> auxAddrServer0 = + rpcServer0.getAuxiliaryRpcAddresses(); + assertEquals(2, auxAddrServer0.size()); + + // all the addresses below are valid nn1 addresses + NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer(); + InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress(); + Set<InetSocketAddress> auxAddrServer1 = + rpcServer1.getAuxiliaryRpcAddresses(); + assertEquals(2, auxAddrServer1.size()); + + // mkdir on nn0 uri 0 + URI nn0URI = new URI("hdfs://localhost:" + + server0RpcAddress.getPort()); + try (DFSClient client0 = new DFSClient(nn0URI, conf)){ + client0.mkdirs("/test", null, true); + // should be available on other ports also + for (InetSocketAddress auxAddr : auxAddrServer0) { + nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort()); + try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) { + assertTrue(clientTmp.exists("/test")); + } + } + } + + // now perform a failover + cluster.shutdownNameNode(0); + cluster.transitionToActive(1); + + // then try to read the file from the nn1 + URI nn1URI = new URI("hdfs://localhost:" + + server1RpcAddress.getPort()); + try (DFSClient client1 = new DFSClient(nn1URI, conf)) { + assertTrue(client1.exists("/test")); + // should be available on other ports also + for (InetSocketAddress auxAddr : auxAddrServer1) { + nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort()); + try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) { + assertTrue(client1.exists("/test")); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org