HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.
(cherry picked from commit 8a9cdebebf26841a0f1e99fb08135f4597f2eba2) (cherry picked from commit ca4f209b49e3aad6a80306f7342c9b6b560a79a7) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6989725 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6989725 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6989725 Branch: refs/heads/branch-2 Commit: e69897253daa4749153143935459543e8ecadb6e Parents: 93687da Author: Inigo Goiri <inigo...@apache.org> Authored: Thu May 11 09:57:03 2017 -0700 Committer: vrushali <vrush...@apache.org> Committed: Fri Oct 20 11:22:30 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 38 + .../resolver/FederationNamespaceInfo.java | 46 +- .../federation/resolver/RemoteLocation.java | 46 +- .../federation/router/ConnectionContext.java | 104 + .../federation/router/ConnectionManager.java | 408 ++++ .../federation/router/ConnectionPool.java | 314 +++ .../federation/router/ConnectionPoolId.java | 117 ++ .../router/RemoteLocationContext.java | 38 +- .../server/federation/router/RemoteMethod.java | 164 ++ .../server/federation/router/RemoteParam.java | 71 + .../hdfs/server/federation/router/Router.java | 58 +- .../federation/router/RouterRpcClient.java | 856 ++++++++ .../federation/router/RouterRpcServer.java | 1867 +++++++++++++++++- .../src/main/resources/hdfs-default.xml | 95 + .../server/federation/FederationTestUtils.java | 80 +- .../hdfs/server/federation/MockResolver.java | 90 +- .../server/federation/RouterConfigBuilder.java | 20 +- .../server/federation/RouterDFSCluster.java | 535 +++-- .../server/federation/router/TestRouter.java | 31 +- .../server/federation/router/TestRouterRpc.java | 869 ++++++++ .../router/TestRouterRpcMultiDestination.java | 216 ++ 21 files changed, 5675 insertions(+), 388 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1b66ead..5d6c467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1012,6 +1012,44 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // HDFS Router-based federation public static final String FEDERATION_ROUTER_PREFIX = "dfs.federation.router."; + public static final String DFS_ROUTER_DEFAULT_NAMESERVICE = + FEDERATION_ROUTER_PREFIX + "default.nameserviceId"; + public static final String DFS_ROUTER_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "handler.count"; + public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "reader.queue.size"; + public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_READER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "reader.count"; + public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1; + public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "handler.queue.size"; + public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_RPC_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-bind-host"; + public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888; + public static final String DFS_ROUTER_RPC_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-address"; + public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT; + public static final String DFS_ROUTER_RPC_ENABLE = + FEDERATION_ROUTER_PREFIX + "rpc.enable"; + public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + + // HDFS Router NN client + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = + FEDERATION_ROUTER_PREFIX + "connection.pool-size"; + public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = + 64; + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN = + FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms"; + public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS = + FEDERATION_ROUTER_PREFIX + "connection.clean.ms"; + public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(10); // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java index bbaeca3..edcd308 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java @@ -23,15 +23,14 @@ import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; * Represents information about a single nameservice/namespace in a federated * HDFS cluster. */ -public class FederationNamespaceInfo - implements Comparable<FederationNamespaceInfo>, RemoteLocationContext { +public class FederationNamespaceInfo extends RemoteLocationContext { /** Block pool identifier. */ - private String blockPoolId; + private final String blockPoolId; /** Cluster identifier. */ - private String clusterId; + private final String clusterId; /** Nameservice identifier. */ - private String nameserviceId; + private final String nameserviceId; public FederationNamespaceInfo(String bpId, String clId, String nsId) { this.blockPoolId = bpId; @@ -39,15 +38,16 @@ public class FederationNamespaceInfo this.nameserviceId = nsId; } - /** - * The HDFS nameservice id for this namespace. - * - * @return Nameservice identifier. - */ + @Override public String getNameserviceId() { return this.nameserviceId; } + @Override + public String getDest() { + return this.nameserviceId; + } + /** * The HDFS cluster id for this namespace. * @@ -67,33 +67,7 @@ public class FederationNamespaceInfo } @Override - public int hashCode() { - return this.nameserviceId.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } else if (obj instanceof FederationNamespaceInfo) { - return this.compareTo((FederationNamespaceInfo) obj) == 0; - } else { - return false; - } - } - - @Override - public int compareTo(FederationNamespaceInfo info) { - return this.nameserviceId.compareTo(info.getNameserviceId()); - } - - @Override public String toString() { return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId; } - - @Override - public String getDest() { - return this.nameserviceId; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java index eef136d..6aa12ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java @@ -17,34 +17,51 @@ */ package org.apache.hadoop.hdfs.server.federation.resolver; -import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext; /** * A single in a remote namespace consisting of a nameservice ID * and a HDFS path. */ -public class RemoteLocation implements RemoteLocationContext { +public class RemoteLocation extends RemoteLocationContext { /** Identifier of the remote namespace for this location. */ - private String nameserviceId; + private final String nameserviceId; + /** Identifier of the namenode in the namespace for this location. */ + private final String namenodeId; /** Path in the remote location. */ - private String path; + private final String path; /** * Create a new remote location. * + * @param nsId + * @param pPath + */ + public RemoteLocation(String nsId, String pPath) { + this(nsId, null, pPath); + } + + /** + * Create a new remote location pointing to a particular namenode in the + * namespace. + * * @param nsId Destination namespace. * @param pPath Path in the destination namespace. */ - public RemoteLocation(String nsId, String pPath) { + public RemoteLocation(String nsId, String nnId, String pPath) { this.nameserviceId = nsId; + this.namenodeId = nnId; this.path = pPath; } @Override public String getNameserviceId() { - return this.nameserviceId; + String ret = this.nameserviceId; + if (this.namenodeId != null) { + ret += "-" + this.namenodeId; + } + return ret; } @Override @@ -54,21 +71,6 @@ public class RemoteLocation implements RemoteLocationContext { @Override public String toString() { - return this.nameserviceId + "->" + this.path; - } - - @Override - public int hashCode() { - return new HashCodeBuilder(17, 31) - .append(this.nameserviceId) - .append(this.path) - .toHashCode(); - } - - @Override - public boolean equals(Object obj) { - return (obj != null && - obj.getClass() == this.getClass() && - obj.hashCode() == this.hashCode()); + return getNameserviceId() + "->" + this.path; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java new file mode 100644 index 0000000..1d27b51 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.ipc.RPC; + +/** + * Context to track a connection in a {@link ConnectionPool}. When a client uses + * a connection, it increments a counter to mark it as active. Once the client + * is done with the connection, it decreases the counter. It also takes care of + * closing the connection once is not active. + */ +public class ConnectionContext { + + /** Client for the connection. */ + private final ProxyAndInfo<ClientProtocol> client; + /** How many threads are using this connection. */ + private int numThreads = 0; + /** If the connection is closed. */ + private boolean closed = false; + + + public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) { + this.client = connection; + } + + /** + * Check if the connection is active. + * + * @return True if the connection is active. + */ + public synchronized boolean isActive() { + return this.numThreads > 0; + } + + /** + * Check if the connection is closed. + * + * @return If the connection is closed. + */ + public synchronized boolean isClosed() { + return this.closed; + } + + /** + * Check if the connection can be used. It checks if the connection is used by + * another thread or already closed. + * + * @return True if the connection can be used. + */ + public synchronized boolean isUsable() { + return !isActive() && !isClosed(); + } + + /** + * Get the connection client. + * + * @return Connection client. + */ + public synchronized ProxyAndInfo<ClientProtocol> getClient() { + this.numThreads++; + return this.client; + } + + /** + * Release this connection. If the connection was closed, close the proxy. + * Otherwise, mark the connection as not used by us anymore. + */ + public synchronized void release() { + if (--this.numThreads == 0 && this.closed) { + close(); + } + } + + /** + * We will not use this connection anymore. If it's not being used, we close + * it. Otherwise, we let release() do it once we are done with it. + */ + public synchronized void close() { + this.closed = true; + if (this.numThreads == 0) { + ClientProtocol proxy = this.client.getProxy(); + // Nobody should be using this anymore so it should close right away + RPC.stopProxy(proxy); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java new file mode 100644 index 0000000..d93d498 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a pool of connections for the {@link Router} to be able to open + * many connections to many Namenodes. + */ +public class ConnectionManager { + + private static final Logger LOG = + LoggerFactory.getLogger(ConnectionManager.class); + + /** Number of parallel new connections to create. */ + protected static final int MAX_NEW_CONNECTIONS = 100; + + /** Minimum amount of active connections: 50%. */ + protected static final float MIN_ACTIVE_RATIO = 0.5f; + + + /** Configuration for the connection manager, pool and sockets. */ + private final Configuration conf; + + /** Min number of connections per user + nn. */ + private final int minSize = 1; + /** Max number of connections per user + nn. */ + private final int maxSize; + + /** How often we close a pool for a particular user + nn. */ + private final long poolCleanupPeriodMs; + /** How often we close a connection in a pool. */ + private final long connectionCleanupPeriodMs; + + /** Map of connection pools, one pool per user + NN. */ + private final Map<ConnectionPoolId, ConnectionPool> pools; + /** Lock for accessing pools. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + /** Queue for creating new connections. */ + private final BlockingQueue<ConnectionPool> creatorQueue = + new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS); + /** Create new connections asynchronously. */ + private final ConnectionCreator creator; + /** Periodic executor to remove stale connection pools. */ + private final ScheduledThreadPoolExecutor cleaner = + new ScheduledThreadPoolExecutor(1); + + /** If the connection manager is running. */ + private boolean running = false; + + + /** + * Creates a proxy client connection pool manager. + * + * @param config Configuration for the connections. + * @param minPoolSize Min size of the connection pool. + * @param maxPoolSize Max size of the connection pool. + */ + public ConnectionManager(Configuration config) { + this.conf = config; + + // Configure minimum and maximum connection pools + this.maxSize = this.conf.getInt( + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT); + + // Map with the connections indexed by UGI and Namenode + this.pools = new HashMap<>(); + + // Create connections in a thread asynchronously + this.creator = new ConnectionCreator(creatorQueue); + this.creator.setDaemon(true); + + // Cleanup periods + this.poolCleanupPeriodMs = this.conf.getLong( + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT); + LOG.info("Cleaning connection pools every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs)); + this.connectionCleanupPeriodMs = this.conf.getLong( + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, + DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT); + LOG.info("Cleaning connections every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs)); + } + + /** + * Start the connection manager. + */ + public void start() { + // Start the thread that creates connections asynchronously + this.creator.start(); + + // Schedule a task to remove stale connection pools and sockets + long recyleTimeMs = Math.min( + poolCleanupPeriodMs, connectionCleanupPeriodMs); + LOG.info("Cleaning every {} seconds", + TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs)); + this.cleaner.scheduleAtFixedRate( + new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS); + + // Mark the manager as running + this.running = true; + } + + /** + * Stop the connection manager by closing all the pools. + */ + public void close() { + this.creator.shutdown(); + this.cleaner.shutdown(); + this.running = false; + + writeLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + pool.close(); + } + this.pools.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * Fetches the next available proxy client in the pool. Each client connection + * is reserved for a single user and cannot be reused until free. + * + * @param ugi User group information. + * @param nnAddress Namenode address for the connection. + * @return Proxy client to connect to nnId as UGI. + * @throws IOException If the connection cannot be obtained. + */ + public ConnectionContext getConnection( + UserGroupInformation ugi, String nnAddress) throws IOException { + + // Check if the manager is shutdown + if (!this.running) { + LOG.error( + "Cannot get a connection to {} because the manager isn't running", + nnAddress); + return null; + } + + // Try to get the pool if created + ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress); + ConnectionPool pool = null; + readLock.lock(); + try { + pool = this.pools.get(connectionId); + } finally { + readLock.unlock(); + } + + // Create the pool if not created before + if (pool == null) { + writeLock.lock(); + try { + pool = this.pools.get(connectionId); + if (pool == null) { + pool = new ConnectionPool( + this.conf, nnAddress, ugi, this.minSize, this.maxSize); + this.pools.put(connectionId, pool); + } + } finally { + writeLock.unlock(); + } + } + + ConnectionContext conn = pool.getConnection(); + + // Add a new connection to the pool if it wasn't usable + if (conn == null || !conn.isUsable()) { + if (!this.creatorQueue.offer(pool)) { + LOG.error("Cannot add more than {} connections at the same time", + MAX_NEW_CONNECTIONS); + } + } + + if (conn != null && conn.isClosed()) { + LOG.error("We got a closed connection from {}", pool); + conn = null; + } + + return conn; + } + + /** + * Get the number of connection pools. + * + * @return Number of connection pools. + */ + public int getNumConnectionPools() { + readLock.lock(); + try { + return pools.size(); + } finally { + readLock.unlock(); + } + } + + /** + * Get number of open connections. + * + * @return Number of open connections. + */ + public int getNumConnections() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumConnections(); + } + } finally { + readLock.unlock(); + } + return total; + } + + /** + * Get number of active connections. + * + * @return Number of active connections. + */ + public int getNumActiveConnections() { + int total = 0; + readLock.lock(); + try { + for (ConnectionPool pool : this.pools.values()) { + total += pool.getNumActiveConnections(); + } + } finally { + readLock.unlock(); + } + return total; + } + + /** + * Get the number of connections to be created. + * + * @return Number of connections to be created. + */ + public int getNumCreatingConnections() { + return this.creatorQueue.size(); + } + + /** + * Removes stale connections not accessed recently from the pool. This is + * invoked periodically. + */ + private class CleanupTask implements Runnable { + + @Override + public void run() { + long currentTime = Time.now(); + List<ConnectionPoolId> toRemove = new LinkedList<>(); + + // Look for stale pools + readLock.lock(); + try { + for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) { + ConnectionPool pool = entry.getValue(); + long lastTimeActive = pool.getLastActiveTime(); + boolean isStale = + currentTime > (lastTimeActive + poolCleanupPeriodMs); + if (lastTimeActive > 0 && isStale) { + // Remove this pool + LOG.debug("Closing and removing stale pool {}", pool); + pool.close(); + ConnectionPoolId poolId = entry.getKey(); + toRemove.add(poolId); + } else { + // Keep this pool but clean connections inside + LOG.debug("Cleaning up {}", pool); + cleanup(pool); + } + } + } finally { + readLock.unlock(); + } + + // Remove stale pools + if (!toRemove.isEmpty()) { + writeLock.lock(); + try { + for (ConnectionPoolId poolId : toRemove) { + pools.remove(poolId); + } + } finally { + writeLock.unlock(); + } + } + } + + /** + * Clean the unused connections for this pool. + * + * @param pool Connection pool to cleanup. + */ + private void cleanup(ConnectionPool pool) { + if (pool.getNumConnections() > pool.getMinSize()) { + // Check if the pool hasn't been active in a while or not 50% are used + long timeSinceLastActive = Time.now() - pool.getLastActiveTime(); + int total = pool.getNumConnections(); + int active = getNumActiveConnections(); + if (timeSinceLastActive > connectionCleanupPeriodMs || + active < MIN_ACTIVE_RATIO * total) { + // Remove and close 1 connection + List<ConnectionContext> conns = pool.removeConnections(1); + for (ConnectionContext conn : conns) { + conn.close(); + } + LOG.debug("Removed connection {} used {} seconds ago. " + + "Pool has {}/{} connections", pool.getConnectionPoolId(), + TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive), + pool.getNumConnections(), pool.getMaxSize()); + } + } + } + } + + /** + * Thread that creates connections asynchronously. + */ + private static class ConnectionCreator extends Thread { + /** If the creator is running. */ + private boolean running = true; + /** Queue to push work to. */ + private BlockingQueue<ConnectionPool> queue; + + ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) { + super("Connection creator"); + this.queue = blockingQueue; + } + + @Override + public void run() { + while (this.running) { + try { + ConnectionPool pool = this.queue.take(); + try { + int total = pool.getNumConnections(); + int active = pool.getNumActiveConnections(); + if (pool.getNumConnections() < pool.getMaxSize() && + active >= MIN_ACTIVE_RATIO * total) { + ConnectionContext conn = pool.newConnection(); + pool.addConnection(conn); + } else { + LOG.debug("Cannot add more than {} connections to {}", + pool.getMaxSize(), pool); + } + } catch (IOException e) { + LOG.error("Cannot create a new connection", e); + } + } catch (InterruptedException e) { + LOG.error("The connection creator was interrupted"); + this.running = false; + } + } + } + + /** + * Stop this connection creator. + */ + public void shutdown() { + this.running = false; + this.interrupt(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java new file mode 100644 index 0000000..f76f621 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.SocketFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains a pool of connections for each User (including tokens) + NN. The + * RPC client maintains a single socket, to achieve throughput similar to a NN, + * each request is multiplexed across multiple sockets/connections from a + * pool. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ConnectionPool { + + private static final Logger LOG = + LoggerFactory.getLogger(ConnectionPool.class); + + + /** Configuration settings for the connection pool. */ + private final Configuration conf; + + /** Identifier for this connection pool. */ + private final ConnectionPoolId connectionPoolId; + /** Namenode this pool connects to. */ + private final String namenodeAddress; + /** User for this connections. */ + private final UserGroupInformation ugi; + + /** Pool of connections. We mimic a COW array. */ + private volatile List<ConnectionContext> connections = new ArrayList<>(); + /** Connection index for round-robin. */ + private final AtomicInteger clientIndex = new AtomicInteger(0); + + /** Min number of connections per user. */ + private final int minSize; + /** Max number of connections per user. */ + private final int maxSize; + + /** The last time a connection was active. */ + private volatile long lastActiveTime = 0; + + + protected ConnectionPool(Configuration config, String address, + UserGroupInformation user, int minPoolSize, int maxPoolSize) + throws IOException { + + this.conf = config; + + // Connection pool target + this.ugi = user; + this.namenodeAddress = address; + this.connectionPoolId = + new ConnectionPoolId(this.ugi, this.namenodeAddress); + + // Set configuration parameters for the pool + this.minSize = minPoolSize; + this.maxSize = maxPoolSize; + + // Add minimum connections to the pool + for (int i=0; i<this.minSize; i++) { + ConnectionContext newConnection = newConnection(); + this.connections.add(newConnection); + } + LOG.debug("Created connection pool \"{}\" with {} connections", + this.connectionPoolId, this.minSize); + } + + /** + * Get the maximum number of connections allowed in this pool. + * + * @return Maximum number of connections. + */ + protected int getMaxSize() { + return this.maxSize; + } + + /** + * Get the minimum number of connections in this pool. + * + * @return Minimum number of connections. + */ + protected int getMinSize() { + return this.minSize; + } + + /** + * Get the connection pool identifier. + * + * @return Connection pool identifier. + */ + protected ConnectionPoolId getConnectionPoolId() { + return this.connectionPoolId; + } + + /** + * Return the next connection round-robin. + * + * @return Connection context. + */ + protected ConnectionContext getConnection() { + + this.lastActiveTime = Time.now(); + + // Get a connection from the pool following round-robin + ConnectionContext conn = null; + List<ConnectionContext> tmpConnections = this.connections; + int size = tmpConnections.size(); + int threadIndex = this.clientIndex.getAndIncrement(); + for (int i=0; i<size; i++) { + int index = (threadIndex + i) % size; + conn = tmpConnections.get(index); + if (conn != null && !conn.isUsable()) { + return conn; + } + } + + // We return a connection even if it's active + return conn; + } + + /** + * Add a connection to the current pool. It uses a Copy-On-Write approach. + * + * @param conns New connections to add to the pool. + */ + public synchronized void addConnection(ConnectionContext conn) { + List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections); + tmpConnections.add(conn); + this.connections = tmpConnections; + + this.lastActiveTime = Time.now(); + } + + /** + * Remove connections from the current pool. + * + * @param num Number of connections to remove. + * @return Removed connections. + */ + public synchronized List<ConnectionContext> removeConnections(int num) { + List<ConnectionContext> removed = new LinkedList<>(); + + // Remove and close the last connection + List<ConnectionContext> tmpConnections = new ArrayList<>(); + for (int i=0; i<this.connections.size(); i++) { + ConnectionContext conn = this.connections.get(i); + if (i < this.minSize || i < this.connections.size() - num) { + tmpConnections.add(conn); + } else { + removed.add(conn); + } + } + this.connections = tmpConnections; + + return removed; + } + + /** + * Close the connection pool. + */ + protected synchronized void close() { + long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds( + Time.now() - getLastActiveTime()); + LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago", + this.connectionPoolId, timeSinceLastActive); + + for (ConnectionContext connection : this.connections) { + connection.close(); + } + this.connections.clear(); + } + + /** + * Number of connections in the pool. + * + * @return Number of connections. + */ + protected int getNumConnections() { + return this.connections.size(); + } + + /** + * Number of active connections in the pool. + * + * @return Number of active connections. + */ + protected int getNumActiveConnections() { + int ret = 0; + + List<ConnectionContext> tmpConnections = this.connections; + for (ConnectionContext conn : tmpConnections) { + if (conn.isActive()) { + ret++; + } + } + return ret; + } + + /** + * Get the last time the connection pool was used. + * + * @return Last time the connection pool was used. + */ + protected long getLastActiveTime() { + return this.lastActiveTime; + } + + @Override + public String toString() { + return this.connectionPoolId.toString(); + } + + /** + * Create a new proxy wrapper for a client NN connection. + * @return Proxy for the target ClientProtocol that contains the user's + * security context. + * @throws IOException + */ + public ConnectionContext newConnection() throws IOException { + return newConnection(this.conf, this.namenodeAddress, this.ugi); + } + + /** + * Creates a proxy wrapper for a client NN connection. Each proxy contains + * context for a single user/security context. To maximize throughput it is + * recommended to use multiple connection per user+server, allowing multiple + * writes and reads to be dispatched in parallel. + * + * @param conf Configuration for the connection. + * @param nnAddress Address of server supporting the ClientProtocol. + * @param ugi User context. + * @return Proxy for the target ClientProtocol that contains the user's + * security context. + * @throws IOException If it cannot be created. + */ + protected static ConnectionContext newConnection(Configuration conf, + String nnAddress, UserGroupInformation ugi) + throws IOException { + RPC.setProtocolEngine( + conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); + + final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( + conf, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, + HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, + HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, + HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, + HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); + + SocketFactory factory = SocketFactory.getDefault(); + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } + InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); + final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); + ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( + ClientNamenodeProtocolPB.class, version, socket, ugi, conf, + factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy); + Text dtService = SecurityUtil.buildTokenService(socket); + + ProxyAndInfo<ClientProtocol> clientProxy = + new ProxyAndInfo<ClientProtocol>(client, dtService, socket); + ConnectionContext connection = new ConnectionContext(clientProxy); + return connection; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java new file mode 100644 index 0000000..a3a78de --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Identifier for a connection for a user to a namenode. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ConnectionPoolId implements Comparable<ConnectionPoolId> { + + /** Namenode identifier. */ + private final String nnId; + /** Information about the user. */ + private final UserGroupInformation ugi; + + /** + * New connection pool identifier. + * + * @param ugi Information of the user issuing the request. + * @param nnId Namenode address with port. + */ + public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) { + this.nnId = nnId; + this.ugi = ugi; + } + + @Override + public int hashCode() { + int hash = new HashCodeBuilder(17, 31) + .append(this.nnId) + .append(this.ugi.toString()) + .append(this.getTokenIds()) + .toHashCode(); + return hash; + } + + @Override + public boolean equals(Object o) { + if (o instanceof ConnectionPoolId) { + ConnectionPoolId other = (ConnectionPoolId) o; + if (!this.nnId.equals(other.nnId)) { + return false; + } + if (!this.ugi.toString().equals(other.ugi.toString())) { + return false; + } + String thisTokens = this.getTokenIds().toString(); + String otherTokens = other.getTokenIds().toString(); + return thisTokens.equals(otherTokens); + } + return false; + } + + @Override + public String toString() { + return this.ugi + " " + this.getTokenIds() + "->" + this.nnId; + } + + @Override + public int compareTo(ConnectionPoolId other) { + int ret = this.nnId.compareTo(other.nnId); + if (ret == 0) { + ret = this.ugi.toString().compareTo(other.ugi.toString()); + } + if (ret == 0) { + String thisTokens = this.getTokenIds().toString(); + String otherTokens = other.getTokenIds().toString(); + ret = thisTokens.compareTo(otherTokens); + } + return ret; + } + + /** + * Get the token identifiers for this connection. + * @return List with the token identifiers. + */ + private List<String> getTokenIds() { + List<String> tokenIds = new ArrayList<>(); + Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens(); + for (Token<? extends TokenIdentifier> token : tokens) { + byte[] tokenIdBytes = token.getIdentifier(); + String tokenId = Arrays.toString(tokenIdBytes); + tokenIds.add(tokenId); + } + Collections.sort(tokenIds); + return tokenIds; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java index da6066b..a90c460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java @@ -17,22 +17,52 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import org.apache.commons.lang.builder.HashCodeBuilder; + /** - * Interface for objects that are unique to a namespace. + * Base class for objects that are unique to a namespace. */ -public interface RemoteLocationContext { +public abstract class RemoteLocationContext + implements Comparable<RemoteLocationContext> { /** * Returns an identifier for a unique namespace. * * @return Namespace identifier. */ - String getNameserviceId(); + public abstract String getNameserviceId(); /** * Destination in this location. For example the path in a remote namespace. * * @return Destination in this location. */ - String getDest(); + public abstract String getDest(); + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 31) + .append(getNameserviceId()) + .append(getDest()) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RemoteLocationContext) { + RemoteLocationContext other = (RemoteLocationContext) obj; + return this.getNameserviceId().equals(other.getNameserviceId()) && + this.getDest().equals(other.getDest()); + } + return false; + } + + @Override + public int compareTo(RemoteLocationContext info) { + int ret = this.getNameserviceId().compareTo(info.getNameserviceId()); + if (ret == 0) { + ret = this.getDest().compareTo(info.getDest()); + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java new file mode 100644 index 0000000..cd57d45 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; + +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Determines the remote client protocol method and the parameter list for a + * specific location. + */ +public class RemoteMethod { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class); + + + /** List of parameters: static and dynamic values, matchings types. */ + private final Object[] params; + /** List of method parameters types, matches parameters. */ + private final Class<?>[] types; + /** String name of the ClientProtocol method. */ + private final String methodName; + + /** + * Create a method with no parameters. + * + * @param method The string name of the ClientProtocol method. + */ + public RemoteMethod(String method) { + this.params = null; + this.types = null; + this.methodName = method; + } + + /** + * Creates a remote method generator. + * + * @param method The string name of the ClientProtocol method. + * @param pTypes A list of types to use to locate the specific method. + * @param pParams A list of parameters for the method. The order of the + * parameter list must match the order and number of the types. + * Parameters are grouped into 2 categories: + * <ul> + * <li>Static parameters that are immutable across locations. + * <li>Dynamic parameters that are determined for each location by a + * RemoteParam object. To specify a dynamic parameter, pass an + * instance of RemoteParam in place of the parameter value. + * </ul> + * @throws IOException If the types and parameter lists are not valid. + */ + public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams) + throws IOException { + + if (pParams.length != pTypes.length) { + throw new IOException("Invalid parameters for method " + method); + } + + this.params = pParams; + this.types = pTypes; + this.methodName = method; + } + + /** + * Get the represented java method. + * + * @return Method + * @throws IOException If the method cannot be found. + */ + public Method getMethod() throws IOException { + try { + if (types != null) { + return ClientProtocol.class.getDeclaredMethod(methodName, types); + } else { + return ClientProtocol.class.getDeclaredMethod(methodName); + } + } catch (NoSuchMethodException e) { + // Re-throw as an IOException + LOG.error("Cannot get method {} with types {}", + methodName, Arrays.toString(types), e); + throw new IOException(e); + } catch (SecurityException e) { + LOG.error("Cannot access method {} with types {}", + methodName, Arrays.toString(types), e); + throw new IOException(e); + } + } + + /** + * Get the calling types for this method. + * + * @return An array of calling types. + */ + public Class<?>[] getTypes() { + return this.types; + } + + /** + * Generate a list of parameters for this specific location using no context. + * + * @return A list of parameters for the method customized for the location. + */ + public Object[] getParams() { + return this.getParams(null); + } + + /** + * Get the name of the method. + * + * @return Name of the method. + */ + public String getMethodName() { + return this.methodName; + } + + /** + * Generate a list of parameters for this specific location. Parameters are + * grouped into 2 categories: + * <ul> + * <li>Static parameters that are immutable across locations. + * <li>Dynamic parameters that are determined for each location by a + * RemoteParam object. + * </ul> + * + * @param context The context identifying the location. + * @return A list of parameters for the method customized for the location. + */ + public Object[] getParams(RemoteLocationContext context) { + if (this.params == null) { + return new Object[] {}; + } + Object[] objList = new Object[this.params.length]; + for (int i = 0; i < this.params.length; i++) { + Object currentObj = this.params[i]; + if (currentObj instanceof RemoteParam) { + // Map the parameter using the context + RemoteParam paramGetter = (RemoteParam) currentObj; + objList[i] = paramGetter.getParameterForContext(context); + } else { + objList[i] = currentObj; + } + } + return objList; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java new file mode 100644 index 0000000..8816ff6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.Map; + +/** + * A dynamically assignable parameter that is location-specific. + * <p> + * There are 2 ways this mapping is determined: + * <ul> + * <li>Default: Uses the RemoteLocationContext's destination + * <li>Map: Uses the value of the RemoteLocationContext key provided in the + * parameter map. + * </ul> + */ +public class RemoteParam { + + private final Map<? extends Object, ? extends Object> paramMap; + + /** + * Constructs a default remote parameter. Always maps the value to the + * destination of the provided RemoveLocationContext. + */ + public RemoteParam() { + this.paramMap = null; + } + + /** + * Constructs a map based remote parameter. Determines the value using the + * provided RemoteLocationContext as a key into the map. + * + * @param map Map with RemoteLocationContext keys. + */ + public RemoteParam( + Map<? extends RemoteLocationContext, ? extends Object> map) { + this.paramMap = map; + } + + /** + * Determine the appropriate value for this parameter based on the location. + * + * @param context Context identifying the location. + * @return A parameter specific to this location. + */ + public Object getParameterForContext(RemoteLocationContext context) { + if (context == null) { + return null; + } else if (this.paramMap != null) { + return this.paramMap.get(context); + } else { + // Default case + return context.getDest(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index fe0d02a..019a5cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -22,12 +22,14 @@ import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.new import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; @@ -36,6 +38,8 @@ import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Router that provides a unified view of multiple federated HDFS clusters. It @@ -60,7 +64,7 @@ import org.apache.hadoop.util.StringUtils; @InterfaceStability.Evolving public class Router extends CompositeService { - private static final Log LOG = LogFactory.getLog(Router.class); + private static final Logger LOG = LoggerFactory.getLogger(Router.class); /** Configuration for the Router. */ @@ -71,6 +75,7 @@ public class Router extends CompositeService { /** RPC interface to the client. */ private RouterRpcServer rpcServer; + private InetSocketAddress rpcAddress; /** Interface with the State Store. */ private StateStoreService stateStore; @@ -105,9 +110,6 @@ public class Router extends CompositeService { protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; - // TODO Interface to the State Store - this.stateStore = null; - // Resolver to track active NNs this.namenodeResolver = newActiveNamenodeResolver( this.conf, this.stateStore); @@ -122,6 +124,15 @@ public class Router extends CompositeService { throw new IOException("Cannot find subcluster resolver"); } + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, + DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) { + // Create RPC server + this.rpcServer = createRpcServer(); + addService(this.rpcServer); + this.setRpcServerAddress(rpcServer.getRpcAddress()); + } + super.serviceInit(conf); } @@ -171,11 +182,13 @@ public class Router extends CompositeService { router.init(conf); router.start(); } catch (Throwable e) { - LOG.error("Failed to start router.", e); + LOG.error("Failed to start router", e); terminate(1, e); } } + + ///////////////////////////////////////////////////////// // RPC Server ///////////////////////////////////////////////////////// @@ -183,7 +196,7 @@ public class Router extends CompositeService { /** * Create a new Router RPC server to proxy ClientProtocol requests. * - * @return RouterRpcServer + * @return New Router RPC Server. * @throws IOException If the router RPC server was not started. */ protected RouterRpcServer createRpcServer() throws IOException { @@ -200,6 +213,35 @@ public class Router extends CompositeService { return this.rpcServer; } + /** + * Set the current RPC socket for the router. + * + * @param rpcAddress RPC address. + */ + protected void setRpcServerAddress(InetSocketAddress address) { + this.rpcAddress = address; + + // Use the RPC address as our unique router Id + if (this.rpcAddress != null) { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + setRouterId(hostname + ":" + this.rpcAddress.getPort()); + } catch (UnknownHostException ex) { + LOG.error("Cannot set unique router ID, address not resolvable {}", + this.rpcAddress); + } + } + } + + /** + * Get the current RPC socket address for the router. + * + * @return InetSocketAddress + */ + public InetSocketAddress getRpcServerAddress() { + return this.rpcAddress; + } + ///////////////////////////////////////////////////////// // Submodule getters ///////////////////////////////////////////////////////// --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org