HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo 
Goiri.

(cherry picked from commit 8a9cdebebf26841a0f1e99fb08135f4597f2eba2)
(cherry picked from commit ca4f209b49e3aad6a80306f7342c9b6b560a79a7)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6989725
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6989725
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6989725

Branch: refs/heads/branch-2
Commit: e69897253daa4749153143935459543e8ecadb6e
Parents: 93687da
Author: Inigo Goiri <inigo...@apache.org>
Authored: Thu May 11 09:57:03 2017 -0700
Committer: vrushali <vrush...@apache.org>
Committed: Fri Oct 20 11:22:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   38 +
 .../resolver/FederationNamespaceInfo.java       |   46 +-
 .../federation/resolver/RemoteLocation.java     |   46 +-
 .../federation/router/ConnectionContext.java    |  104 +
 .../federation/router/ConnectionManager.java    |  408 ++++
 .../federation/router/ConnectionPool.java       |  314 +++
 .../federation/router/ConnectionPoolId.java     |  117 ++
 .../router/RemoteLocationContext.java           |   38 +-
 .../server/federation/router/RemoteMethod.java  |  164 ++
 .../server/federation/router/RemoteParam.java   |   71 +
 .../hdfs/server/federation/router/Router.java   |   58 +-
 .../federation/router/RouterRpcClient.java      |  856 ++++++++
 .../federation/router/RouterRpcServer.java      | 1867 +++++++++++++++++-
 .../src/main/resources/hdfs-default.xml         |   95 +
 .../server/federation/FederationTestUtils.java  |   80 +-
 .../hdfs/server/federation/MockResolver.java    |   90 +-
 .../server/federation/RouterConfigBuilder.java  |   20 +-
 .../server/federation/RouterDFSCluster.java     |  535 +++--
 .../server/federation/router/TestRouter.java    |   31 +-
 .../server/federation/router/TestRouterRpc.java |  869 ++++++++
 .../router/TestRouterRpcMultiDestination.java   |  216 ++
 21 files changed, 5675 insertions(+), 388 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 1b66ead..5d6c467 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1012,6 +1012,44 @@ public class DFSConfigKeys extends 
CommonConfigurationKeys {
   // HDFS Router-based federation
   public static final String FEDERATION_ROUTER_PREFIX =
       "dfs.federation.router.";
+  public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
+      FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
+  public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
+      FEDERATION_ROUTER_PREFIX + "handler.count";
+  public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
+      FEDERATION_ROUTER_PREFIX + "reader.queue.size";
+  public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_READER_COUNT_KEY =
+      FEDERATION_ROUTER_PREFIX + "reader.count";
+  public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
+  public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
+      FEDERATION_ROUTER_PREFIX + "handler.queue.size";
+  public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
+      FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
+  public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888;
+  public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
+      FEDERATION_ROUTER_PREFIX + "rpc-address";
+  public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
+  public static final String DFS_ROUTER_RPC_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "rpc.enable";
+  public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+
+  // HDFS Router NN client
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
+      FEDERATION_ROUTER_PREFIX + "connection.pool-size";
+  public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
+      64;
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
+      FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
+  public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
+      FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
+  public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
 
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
index bbaeca3..edcd308 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/FederationNamespaceInfo.java
@@ -23,15 +23,14 @@ import 
org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
  * Represents information about a single nameservice/namespace in a federated
  * HDFS cluster.
  */
-public class FederationNamespaceInfo
-    implements Comparable<FederationNamespaceInfo>, RemoteLocationContext {
+public class FederationNamespaceInfo extends RemoteLocationContext {
 
   /** Block pool identifier. */
-  private String blockPoolId;
+  private final String blockPoolId;
   /** Cluster identifier. */
-  private String clusterId;
+  private final String clusterId;
   /** Nameservice identifier. */
-  private String nameserviceId;
+  private final String nameserviceId;
 
   public FederationNamespaceInfo(String bpId, String clId, String nsId) {
     this.blockPoolId = bpId;
@@ -39,15 +38,16 @@ public class FederationNamespaceInfo
     this.nameserviceId = nsId;
   }
 
-  /**
-   * The HDFS nameservice id for this namespace.
-   *
-   * @return Nameservice identifier.
-   */
+  @Override
   public String getNameserviceId() {
     return this.nameserviceId;
   }
 
+  @Override
+  public String getDest() {
+    return this.nameserviceId;
+  }
+
   /**
    * The HDFS cluster id for this namespace.
    *
@@ -67,33 +67,7 @@ public class FederationNamespaceInfo
   }
 
   @Override
-  public int hashCode() {
-    return this.nameserviceId.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-    } else if (obj instanceof FederationNamespaceInfo) {
-      return this.compareTo((FederationNamespaceInfo) obj) == 0;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int compareTo(FederationNamespaceInfo info) {
-    return this.nameserviceId.compareTo(info.getNameserviceId());
-  }
-
-  @Override
   public String toString() {
     return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId;
   }
-
-  @Override
-  public String getDest() {
-    return this.nameserviceId;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
index eef136d..6aa12ce 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/RemoteLocation.java
@@ -17,34 +17,51 @@
  */
 package org.apache.hadoop.hdfs.server.federation.resolver;
 
-import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
 
 /**
  * A single in a remote namespace consisting of a nameservice ID
  * and a HDFS path.
  */
-public class RemoteLocation implements RemoteLocationContext {
+public class RemoteLocation extends RemoteLocationContext {
 
   /** Identifier of the remote namespace for this location. */
-  private String nameserviceId;
+  private final String nameserviceId;
+  /** Identifier of the namenode in the namespace for this location. */
+  private final String namenodeId;
   /** Path in the remote location. */
-  private String path;
+  private final String path;
 
   /**
    * Create a new remote location.
    *
+   * @param nsId
+   * @param pPath
+   */
+  public RemoteLocation(String nsId, String pPath) {
+    this(nsId, null, pPath);
+  }
+
+  /**
+   * Create a new remote location pointing to a particular namenode in the
+   * namespace.
+   *
    * @param nsId Destination namespace.
    * @param pPath Path in the destination namespace.
    */
-  public RemoteLocation(String nsId, String pPath) {
+  public RemoteLocation(String nsId, String nnId, String pPath) {
     this.nameserviceId = nsId;
+    this.namenodeId = nnId;
     this.path = pPath;
   }
 
   @Override
   public String getNameserviceId() {
-    return this.nameserviceId;
+    String ret = this.nameserviceId;
+    if (this.namenodeId != null) {
+      ret += "-" + this.namenodeId;
+    }
+    return ret;
   }
 
   @Override
@@ -54,21 +71,6 @@ public class RemoteLocation implements RemoteLocationContext 
{
 
   @Override
   public String toString() {
-    return this.nameserviceId + "->" + this.path;
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(17, 31)
-        .append(this.nameserviceId)
-        .append(this.path)
-        .toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return (obj != null &&
-        obj.getClass() == this.getClass() &&
-        obj.hashCode() == this.hashCode());
+    return getNameserviceId() + "->" + this.path;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
new file mode 100644
index 0000000..1d27b51
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RPC;
+
+/**
+ * Context to track a connection in a {@link ConnectionPool}. When a client 
uses
+ * a connection, it increments a counter to mark it as active. Once the client
+ * is done with the connection, it decreases the counter. It also takes care of
+ * closing the connection once is not active.
+ */
+public class ConnectionContext {
+
+  /** Client for the connection. */
+  private final ProxyAndInfo<ClientProtocol> client;
+  /** How many threads are using this connection. */
+  private int numThreads = 0;
+  /** If the connection is closed. */
+  private boolean closed = false;
+
+
+  public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
+    this.client = connection;
+  }
+
+  /**
+   * Check if the connection is active.
+   *
+   * @return True if the connection is active.
+   */
+  public synchronized boolean isActive() {
+    return this.numThreads > 0;
+  }
+
+  /**
+   * Check if the connection is closed.
+   *
+   * @return If the connection is closed.
+   */
+  public synchronized boolean isClosed() {
+    return this.closed;
+  }
+
+  /**
+   * Check if the connection can be used. It checks if the connection is used 
by
+   * another thread or already closed.
+   *
+   * @return True if the connection can be used.
+   */
+  public synchronized boolean isUsable() {
+    return !isActive() && !isClosed();
+  }
+
+  /**
+   * Get the connection client.
+   *
+   * @return Connection client.
+   */
+  public synchronized ProxyAndInfo<ClientProtocol> getClient() {
+    this.numThreads++;
+    return this.client;
+  }
+
+  /**
+   * Release this connection. If the connection was closed, close the proxy.
+   * Otherwise, mark the connection as not used by us anymore.
+   */
+  public synchronized void release() {
+    if (--this.numThreads == 0 && this.closed) {
+      close();
+    }
+  }
+
+  /**
+   * We will not use this connection anymore. If it's not being used, we close
+   * it. Otherwise, we let release() do it once we are done with it.
+   */
+  public synchronized void close() {
+    this.closed = true;
+    if (this.numThreads == 0) {
+      ClientProtocol proxy = this.client.getProxy();
+      // Nobody should be using this anymore so it should close right away
+      RPC.stopProxy(proxy);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
new file mode 100644
index 0000000..d93d498
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a pool of connections for the {@link Router} to be able to open
+ * many connections to many Namenodes.
+ */
+public class ConnectionManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ConnectionManager.class);
+
+  /** Number of parallel new connections to create. */
+  protected static final int MAX_NEW_CONNECTIONS = 100;
+
+  /** Minimum amount of active connections: 50%. */
+  protected static final float MIN_ACTIVE_RATIO = 0.5f;
+
+
+  /** Configuration for the connection manager, pool and sockets. */
+  private final Configuration conf;
+
+  /** Min number of connections per user + nn. */
+  private final int minSize = 1;
+  /** Max number of connections per user + nn. */
+  private final int maxSize;
+
+  /** How often we close a pool for a particular user + nn. */
+  private final long poolCleanupPeriodMs;
+  /** How often we close a connection in a pool. */
+  private final long connectionCleanupPeriodMs;
+
+  /** Map of connection pools, one pool per user + NN. */
+  private final Map<ConnectionPoolId, ConnectionPool> pools;
+  /** Lock for accessing pools. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  /** Queue for creating new connections. */
+  private final BlockingQueue<ConnectionPool> creatorQueue =
+      new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS);
+  /** Create new connections asynchronously. */
+  private final ConnectionCreator creator;
+  /** Periodic executor to remove stale connection pools. */
+  private final ScheduledThreadPoolExecutor cleaner =
+      new ScheduledThreadPoolExecutor(1);
+
+  /** If the connection manager is running. */
+  private boolean running = false;
+
+
+  /**
+   * Creates a proxy client connection pool manager.
+   *
+   * @param config Configuration for the connections.
+   * @param minPoolSize Min size of the connection pool.
+   * @param maxPoolSize Max size of the connection pool.
+   */
+  public ConnectionManager(Configuration config) {
+    this.conf = config;
+
+    // Configure minimum and maximum connection pools
+    this.maxSize = this.conf.getInt(
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
+
+    // Map with the connections indexed by UGI and Namenode
+    this.pools = new HashMap<>();
+
+    // Create connections in a thread asynchronously
+    this.creator = new ConnectionCreator(creatorQueue);
+    this.creator.setDaemon(true);
+
+    // Cleanup periods
+    this.poolCleanupPeriodMs = this.conf.getLong(
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
+    LOG.info("Cleaning connection pools every {} seconds",
+        TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
+    this.connectionCleanupPeriodMs = this.conf.getLong(
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
+        DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
+    LOG.info("Cleaning connections every {} seconds",
+        TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
+  }
+
+  /**
+   * Start the connection manager.
+   */
+  public void start() {
+    // Start the thread that creates connections asynchronously
+    this.creator.start();
+
+    // Schedule a task to remove stale connection pools and sockets
+    long recyleTimeMs = Math.min(
+        poolCleanupPeriodMs, connectionCleanupPeriodMs);
+    LOG.info("Cleaning every {} seconds",
+        TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs));
+    this.cleaner.scheduleAtFixedRate(
+        new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
+
+    // Mark the manager as running
+    this.running = true;
+  }
+
+  /**
+   * Stop the connection manager by closing all the pools.
+   */
+  public void close() {
+    this.creator.shutdown();
+    this.cleaner.shutdown();
+    this.running = false;
+
+    writeLock.lock();
+    try {
+      for (ConnectionPool pool : this.pools.values()) {
+        pool.close();
+      }
+      this.pools.clear();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Fetches the next available proxy client in the pool. Each client 
connection
+   * is reserved for a single user and cannot be reused until free.
+   *
+   * @param ugi User group information.
+   * @param nnAddress Namenode address for the connection.
+   * @return Proxy client to connect to nnId as UGI.
+   * @throws IOException If the connection cannot be obtained.
+   */
+  public ConnectionContext getConnection(
+      UserGroupInformation ugi, String nnAddress) throws IOException {
+
+    // Check if the manager is shutdown
+    if (!this.running) {
+      LOG.error(
+          "Cannot get a connection to {} because the manager isn't running",
+          nnAddress);
+      return null;
+    }
+
+    // Try to get the pool if created
+    ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
+    ConnectionPool pool = null;
+    readLock.lock();
+    try {
+      pool = this.pools.get(connectionId);
+    } finally {
+      readLock.unlock();
+    }
+
+    // Create the pool if not created before
+    if (pool == null) {
+      writeLock.lock();
+      try {
+        pool = this.pools.get(connectionId);
+        if (pool == null) {
+          pool = new ConnectionPool(
+              this.conf, nnAddress, ugi, this.minSize, this.maxSize);
+          this.pools.put(connectionId, pool);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    ConnectionContext conn = pool.getConnection();
+
+    // Add a new connection to the pool if it wasn't usable
+    if (conn == null || !conn.isUsable()) {
+      if (!this.creatorQueue.offer(pool)) {
+        LOG.error("Cannot add more than {} connections at the same time",
+            MAX_NEW_CONNECTIONS);
+      }
+    }
+
+    if (conn != null && conn.isClosed()) {
+      LOG.error("We got a closed connection from {}", pool);
+      conn = null;
+    }
+
+    return conn;
+  }
+
+  /**
+   * Get the number of connection pools.
+   *
+   * @return Number of connection pools.
+   */
+  public int getNumConnectionPools() {
+    readLock.lock();
+    try {
+      return pools.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get number of open connections.
+   *
+   * @return Number of open connections.
+   */
+  public int getNumConnections() {
+    int total = 0;
+    readLock.lock();
+    try {
+      for (ConnectionPool pool : this.pools.values()) {
+        total += pool.getNumConnections();
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return total;
+  }
+
+  /**
+   * Get number of active connections.
+   *
+   * @return Number of active connections.
+   */
+  public int getNumActiveConnections() {
+    int total = 0;
+    readLock.lock();
+    try {
+      for (ConnectionPool pool : this.pools.values()) {
+        total += pool.getNumActiveConnections();
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return total;
+  }
+
+  /**
+   * Get the number of connections to be created.
+   *
+   * @return Number of connections to be created.
+   */
+  public int getNumCreatingConnections() {
+    return this.creatorQueue.size();
+  }
+
+  /**
+   * Removes stale connections not accessed recently from the pool. This is
+   * invoked periodically.
+   */
+  private class CleanupTask implements Runnable {
+
+    @Override
+    public void run() {
+      long currentTime = Time.now();
+      List<ConnectionPoolId> toRemove = new LinkedList<>();
+
+      // Look for stale pools
+      readLock.lock();
+      try {
+        for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) 
{
+          ConnectionPool pool = entry.getValue();
+          long lastTimeActive = pool.getLastActiveTime();
+          boolean isStale =
+              currentTime > (lastTimeActive + poolCleanupPeriodMs);
+          if (lastTimeActive > 0 && isStale) {
+            // Remove this pool
+            LOG.debug("Closing and removing stale pool {}", pool);
+            pool.close();
+            ConnectionPoolId poolId = entry.getKey();
+            toRemove.add(poolId);
+          } else {
+            // Keep this pool but clean connections inside
+            LOG.debug("Cleaning up {}", pool);
+            cleanup(pool);
+          }
+        }
+      } finally {
+        readLock.unlock();
+      }
+
+      // Remove stale pools
+      if (!toRemove.isEmpty()) {
+        writeLock.lock();
+        try {
+          for (ConnectionPoolId poolId : toRemove) {
+            pools.remove(poolId);
+          }
+        } finally {
+          writeLock.unlock();
+        }
+      }
+    }
+
+    /**
+     * Clean the unused connections for this pool.
+     *
+     * @param pool Connection pool to cleanup.
+     */
+    private void cleanup(ConnectionPool pool) {
+      if (pool.getNumConnections() > pool.getMinSize()) {
+        // Check if the pool hasn't been active in a while or not 50% are used
+        long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
+        int total = pool.getNumConnections();
+        int active = getNumActiveConnections();
+        if (timeSinceLastActive > connectionCleanupPeriodMs ||
+            active < MIN_ACTIVE_RATIO * total) {
+          // Remove and close 1 connection
+          List<ConnectionContext> conns = pool.removeConnections(1);
+          for (ConnectionContext conn : conns) {
+            conn.close();
+          }
+          LOG.debug("Removed connection {} used {} seconds ago. " +
+              "Pool has {}/{} connections", pool.getConnectionPoolId(),
+              TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
+              pool.getNumConnections(), pool.getMaxSize());
+        }
+      }
+    }
+  }
+
+  /**
+   * Thread that creates connections asynchronously.
+   */
+  private static class ConnectionCreator extends Thread {
+    /** If the creator is running. */
+    private boolean running = true;
+    /** Queue to push work to. */
+    private BlockingQueue<ConnectionPool> queue;
+
+    ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
+      super("Connection creator");
+      this.queue = blockingQueue;
+    }
+
+    @Override
+    public void run() {
+      while (this.running) {
+        try {
+          ConnectionPool pool = this.queue.take();
+          try {
+            int total = pool.getNumConnections();
+            int active = pool.getNumActiveConnections();
+            if (pool.getNumConnections() < pool.getMaxSize() &&
+                active >= MIN_ACTIVE_RATIO * total) {
+              ConnectionContext conn = pool.newConnection();
+              pool.addConnection(conn);
+            } else {
+              LOG.debug("Cannot add more than {} connections to {}",
+                  pool.getMaxSize(), pool);
+            }
+          } catch (IOException e) {
+            LOG.error("Cannot create a new connection", e);
+          }
+        } catch (InterruptedException e) {
+          LOG.error("The connection creator was interrupted");
+          this.running = false;
+        }
+      }
+    }
+
+    /**
+     * Stop this connection creator.
+     */
+    public void shutdown() {
+      this.running = false;
+      this.interrupt();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
new file mode 100644
index 0000000..f76f621
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains a pool of connections for each User (including tokens) + NN. The
+ * RPC client maintains a single socket, to achieve throughput similar to a NN,
+ * each request is multiplexed across multiple sockets/connections from a
+ * pool.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConnectionPool {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ConnectionPool.class);
+
+
+  /** Configuration settings for the connection pool. */
+  private final Configuration conf;
+
+  /** Identifier for this connection pool. */
+  private final ConnectionPoolId connectionPoolId;
+  /** Namenode this pool connects to. */
+  private final String namenodeAddress;
+  /** User for this connections. */
+  private final UserGroupInformation ugi;
+
+  /** Pool of connections. We mimic a COW array. */
+  private volatile List<ConnectionContext> connections = new ArrayList<>();
+  /** Connection index for round-robin. */
+  private final AtomicInteger clientIndex = new AtomicInteger(0);
+
+  /** Min number of connections per user. */
+  private final int minSize;
+  /** Max number of connections per user. */
+  private final int maxSize;
+
+  /** The last time a connection was active. */
+  private volatile long lastActiveTime = 0;
+
+
+  protected ConnectionPool(Configuration config, String address,
+      UserGroupInformation user, int minPoolSize, int maxPoolSize)
+          throws IOException {
+
+    this.conf = config;
+
+    // Connection pool target
+    this.ugi = user;
+    this.namenodeAddress = address;
+    this.connectionPoolId =
+        new ConnectionPoolId(this.ugi, this.namenodeAddress);
+
+    // Set configuration parameters for the pool
+    this.minSize = minPoolSize;
+    this.maxSize = maxPoolSize;
+
+    // Add minimum connections to the pool
+    for (int i=0; i<this.minSize; i++) {
+      ConnectionContext newConnection = newConnection();
+      this.connections.add(newConnection);
+    }
+    LOG.debug("Created connection pool \"{}\" with {} connections",
+        this.connectionPoolId, this.minSize);
+  }
+
+  /**
+   * Get the maximum number of connections allowed in this pool.
+   *
+   * @return Maximum number of connections.
+   */
+  protected int getMaxSize() {
+    return this.maxSize;
+  }
+
+  /**
+   * Get the minimum number of connections in this pool.
+   *
+   * @return Minimum number of connections.
+   */
+  protected int getMinSize() {
+    return this.minSize;
+  }
+
+  /**
+   * Get the connection pool identifier.
+   *
+   * @return Connection pool identifier.
+   */
+  protected ConnectionPoolId getConnectionPoolId() {
+    return this.connectionPoolId;
+  }
+
+  /**
+   * Return the next connection round-robin.
+   *
+   * @return Connection context.
+   */
+  protected ConnectionContext getConnection() {
+
+    this.lastActiveTime = Time.now();
+
+    // Get a connection from the pool following round-robin
+    ConnectionContext conn = null;
+    List<ConnectionContext> tmpConnections = this.connections;
+    int size = tmpConnections.size();
+    int threadIndex = this.clientIndex.getAndIncrement();
+    for (int i=0; i<size; i++) {
+      int index = (threadIndex + i) % size;
+      conn = tmpConnections.get(index);
+      if (conn != null && !conn.isUsable()) {
+        return conn;
+      }
+    }
+
+    // We return a connection even if it's active
+    return conn;
+  }
+
+  /**
+   * Add a connection to the current pool. It uses a Copy-On-Write approach.
+   *
+   * @param conns New connections to add to the pool.
+   */
+  public synchronized void addConnection(ConnectionContext conn) {
+    List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections);
+    tmpConnections.add(conn);
+    this.connections = tmpConnections;
+
+    this.lastActiveTime = Time.now();
+  }
+
+  /**
+   * Remove connections from the current pool.
+   *
+   * @param num Number of connections to remove.
+   * @return Removed connections.
+   */
+  public synchronized List<ConnectionContext> removeConnections(int num) {
+    List<ConnectionContext> removed = new LinkedList<>();
+
+    // Remove and close the last connection
+    List<ConnectionContext> tmpConnections = new ArrayList<>();
+    for (int i=0; i<this.connections.size(); i++) {
+      ConnectionContext conn = this.connections.get(i);
+      if (i < this.minSize || i < this.connections.size() - num) {
+        tmpConnections.add(conn);
+      } else {
+        removed.add(conn);
+      }
+    }
+    this.connections = tmpConnections;
+
+    return removed;
+  }
+
+  /**
+   * Close the connection pool.
+   */
+  protected synchronized void close() {
+    long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds(
+        Time.now() - getLastActiveTime());
+    LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago",
+        this.connectionPoolId, timeSinceLastActive);
+
+    for (ConnectionContext connection : this.connections) {
+      connection.close();
+    }
+    this.connections.clear();
+  }
+
+  /**
+   * Number of connections in the pool.
+   *
+   * @return Number of connections.
+   */
+  protected int getNumConnections() {
+    return this.connections.size();
+  }
+
+  /**
+   * Number of active connections in the pool.
+   *
+   * @return Number of active connections.
+   */
+  protected int getNumActiveConnections() {
+    int ret = 0;
+
+    List<ConnectionContext> tmpConnections = this.connections;
+    for (ConnectionContext conn : tmpConnections) {
+      if (conn.isActive()) {
+        ret++;
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Get the last time the connection pool was used.
+   *
+   * @return Last time the connection pool was used.
+   */
+  protected long getLastActiveTime() {
+    return this.lastActiveTime;
+  }
+
+  @Override
+  public String toString() {
+    return this.connectionPoolId.toString();
+  }
+
+  /**
+   * Create a new proxy wrapper for a client NN connection.
+   * @return Proxy for the target ClientProtocol that contains the user's
+   *         security context.
+   * @throws IOException
+   */
+  public ConnectionContext newConnection() throws IOException {
+    return newConnection(this.conf, this.namenodeAddress, this.ugi);
+  }
+
+  /**
+   * Creates a proxy wrapper for a client NN connection. Each proxy contains
+   * context for a single user/security context. To maximize throughput it is
+   * recommended to use multiple connection per user+server, allowing multiple
+   * writes and reads to be dispatched in parallel.
+   *
+   * @param conf Configuration for the connection.
+   * @param nnAddress Address of server supporting the ClientProtocol.
+   * @param ugi User context.
+   * @return Proxy for the target ClientProtocol that contains the user's
+   *         security context.
+   * @throws IOException If it cannot be created.
+   */
+  protected static ConnectionContext newConnection(Configuration conf,
+      String nnAddress, UserGroupInformation ugi)
+          throws IOException {
+    RPC.setProtocolEngine(
+        conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+    final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
+        conf,
+        HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
+        HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
+        HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
+        HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
+        HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
+
+    SocketFactory factory = SocketFactory.getDefault();
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SaslRpcServer.init(conf);
+    }
+    InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
+    final long version = 
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
+    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
+        ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
+        factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+    ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
+    Text dtService = SecurityUtil.buildTokenService(socket);
+
+    ProxyAndInfo<ClientProtocol> clientProxy =
+        new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
+    ConnectionContext connection = new ConnectionContext(clientProxy);
+    return connection;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
new file mode 100644
index 0000000..a3a78de
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPoolId.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Identifier for a connection for a user to a namenode.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
+
+  /** Namenode identifier. */
+  private final String nnId;
+  /** Information about the user. */
+  private final UserGroupInformation ugi;
+
+  /**
+   * New connection pool identifier.
+   *
+   * @param ugi Information of the user issuing the request.
+   * @param nnId Namenode address with port.
+   */
+  public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
+    this.nnId = nnId;
+    this.ugi = ugi;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = new HashCodeBuilder(17, 31)
+        .append(this.nnId)
+        .append(this.ugi.toString())
+        .append(this.getTokenIds())
+        .toHashCode();
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ConnectionPoolId) {
+      ConnectionPoolId other = (ConnectionPoolId) o;
+      if (!this.nnId.equals(other.nnId)) {
+        return false;
+      }
+      if (!this.ugi.toString().equals(other.ugi.toString())) {
+        return false;
+      }
+      String thisTokens = this.getTokenIds().toString();
+      String otherTokens = other.getTokenIds().toString();
+      return thisTokens.equals(otherTokens);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
+  }
+
+  @Override
+  public int compareTo(ConnectionPoolId other) {
+    int ret = this.nnId.compareTo(other.nnId);
+    if (ret == 0) {
+      ret = this.ugi.toString().compareTo(other.ugi.toString());
+    }
+    if (ret == 0) {
+      String thisTokens = this.getTokenIds().toString();
+      String otherTokens = other.getTokenIds().toString();
+      ret = thisTokens.compareTo(otherTokens);
+    }
+    return ret;
+  }
+
+  /**
+   * Get the token identifiers for this connection.
+   * @return List with the token identifiers.
+   */
+  private List<String> getTokenIds() {
+    List<String> tokenIds = new ArrayList<>();
+    Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens();
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      byte[] tokenIdBytes = token.getIdentifier();
+      String tokenId = Arrays.toString(tokenIdBytes);
+      tokenIds.add(tokenId);
+    }
+    Collections.sort(tokenIds);
+    return tokenIds;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
index da6066b..a90c460 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
@@ -17,22 +17,52 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
 /**
- * Interface for objects that are unique to a namespace.
+ * Base class for objects that are unique to a namespace.
  */
-public interface RemoteLocationContext {
+public abstract class RemoteLocationContext
+    implements Comparable<RemoteLocationContext> {
 
   /**
    * Returns an identifier for a unique namespace.
    *
    * @return Namespace identifier.
    */
-  String getNameserviceId();
+  public abstract String getNameserviceId();
 
   /**
    * Destination in this location. For example the path in a remote namespace.
    *
    * @return Destination in this location.
    */
-  String getDest();
+  public abstract String getDest();
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 31)
+        .append(getNameserviceId())
+        .append(getDest())
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof RemoteLocationContext) {
+      RemoteLocationContext other = (RemoteLocationContext) obj;
+      return this.getNameserviceId().equals(other.getNameserviceId()) &&
+          this.getDest().equals(other.getDest());
+    }
+    return false;
+  }
+
+  @Override
+  public int compareTo(RemoteLocationContext info) {
+    int ret = this.getNameserviceId().compareTo(info.getNameserviceId());
+    if (ret == 0) {
+      ret = this.getDest().compareTo(info.getDest());
+    }
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
new file mode 100644
index 0000000..cd57d45
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Determines the remote client protocol method and the parameter list for a
+ * specific location.
+ */
+public class RemoteMethod {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteMethod.class);
+
+
+  /** List of parameters: static and dynamic values, matchings types. */
+  private final Object[] params;
+  /** List of method parameters types, matches parameters. */
+  private final Class<?>[] types;
+  /** String name of the ClientProtocol method. */
+  private final String methodName;
+
+  /**
+   * Create a method with no parameters.
+   *
+   * @param method The string name of the ClientProtocol method.
+   */
+  public RemoteMethod(String method) {
+    this.params = null;
+    this.types = null;
+    this.methodName = method;
+  }
+
+  /**
+   * Creates a remote method generator.
+   *
+   * @param method The string name of the ClientProtocol method.
+   * @param pTypes A list of types to use to locate the specific method.
+   * @param pParams A list of parameters for the method. The order of the
+   *          parameter list must match the order and number of the types.
+   *          Parameters are grouped into 2 categories:
+   *          <ul>
+   *          <li>Static parameters that are immutable across locations.
+   *          <li>Dynamic parameters that are determined for each location by a
+   *          RemoteParam object. To specify a dynamic parameter, pass an
+   *          instance of RemoteParam in place of the parameter value.
+   *          </ul>
+   * @throws IOException If the types and parameter lists are not valid.
+   */
+  public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
+      throws IOException {
+
+    if (pParams.length != pTypes.length) {
+      throw new IOException("Invalid parameters for method " + method);
+    }
+
+    this.params = pParams;
+    this.types = pTypes;
+    this.methodName = method;
+  }
+
+  /**
+   * Get the represented java method.
+   *
+   * @return Method
+   * @throws IOException If the method cannot be found.
+   */
+  public Method getMethod() throws IOException {
+    try {
+      if (types != null) {
+        return ClientProtocol.class.getDeclaredMethod(methodName, types);
+      } else {
+        return ClientProtocol.class.getDeclaredMethod(methodName);
+      }
+    } catch (NoSuchMethodException e) {
+      // Re-throw as an IOException
+      LOG.error("Cannot get method {} with types {}",
+          methodName, Arrays.toString(types), e);
+      throw new IOException(e);
+    } catch (SecurityException e) {
+      LOG.error("Cannot access method {} with types {}",
+          methodName, Arrays.toString(types), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Get the calling types for this method.
+   *
+   * @return An array of calling types.
+   */
+  public Class<?>[] getTypes() {
+    return this.types;
+  }
+
+  /**
+   * Generate a list of parameters for this specific location using no context.
+   *
+   * @return A list of parameters for the method customized for the location.
+   */
+  public Object[] getParams() {
+    return this.getParams(null);
+  }
+
+  /**
+   * Get the name of the method.
+   *
+   * @return Name of the method.
+   */
+  public String getMethodName() {
+    return this.methodName;
+  }
+
+  /**
+   * Generate a list of parameters for this specific location. Parameters are
+   * grouped into 2 categories:
+   * <ul>
+   * <li>Static parameters that are immutable across locations.
+   * <li>Dynamic parameters that are determined for each location by a
+   * RemoteParam object.
+   * </ul>
+   *
+   * @param context The context identifying the location.
+   * @return A list of parameters for the method customized for the location.
+   */
+  public Object[] getParams(RemoteLocationContext context) {
+    if (this.params == null) {
+      return new Object[] {};
+    }
+    Object[] objList = new Object[this.params.length];
+    for (int i = 0; i < this.params.length; i++) {
+      Object currentObj = this.params[i];
+      if (currentObj instanceof RemoteParam) {
+        // Map the parameter using the context
+        RemoteParam paramGetter = (RemoteParam) currentObj;
+        objList[i] = paramGetter.getParameterForContext(context);
+      } else {
+        objList[i] = currentObj;
+      }
+    }
+    return objList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
new file mode 100644
index 0000000..8816ff6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.Map;
+
+/**
+ * A dynamically assignable parameter that is location-specific.
+ * <p>
+ * There are 2 ways this mapping is determined:
+ * <ul>
+ * <li>Default: Uses the RemoteLocationContext's destination
+ * <li>Map: Uses the value of the RemoteLocationContext key provided in the
+ * parameter map.
+ * </ul>
+ */
+public class RemoteParam {
+
+  private final Map<? extends Object, ? extends Object> paramMap;
+
+  /**
+   * Constructs a default remote parameter. Always maps the value to the
+   * destination of the provided RemoveLocationContext.
+   */
+  public RemoteParam() {
+    this.paramMap = null;
+  }
+
+  /**
+   * Constructs a map based remote parameter. Determines the value using the
+   * provided RemoteLocationContext as a key into the map.
+   *
+   * @param map Map with RemoteLocationContext keys.
+   */
+  public RemoteParam(
+      Map<? extends RemoteLocationContext, ? extends Object> map) {
+    this.paramMap = map;
+  }
+
+  /**
+   * Determine the appropriate value for this parameter based on the location.
+   *
+   * @param context Context identifying the location.
+   * @return A parameter specific to this location.
+   */
+  public Object getParameterForContext(RemoteLocationContext context) {
+    if (context == null) {
+      return null;
+    } else if (this.paramMap != null) {
+      return this.paramMap.get(context);
+    } else {
+      // Default case
+      return context.getDest();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6989725/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index fe0d02a..019a5cd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -22,12 +22,14 @@ import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.new
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@@ -36,6 +38,8 @@ import 
org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Router that provides a unified view of multiple federated HDFS clusters. It
@@ -60,7 +64,7 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceStability.Evolving
 public class Router extends CompositeService {
 
-  private static final Log LOG = LogFactory.getLog(Router.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Router.class);
 
 
   /** Configuration for the Router. */
@@ -71,6 +75,7 @@ public class Router extends CompositeService {
 
   /** RPC interface to the client. */
   private RouterRpcServer rpcServer;
+  private InetSocketAddress rpcAddress;
 
   /** Interface with the State Store. */
   private StateStoreService stateStore;
@@ -105,9 +110,6 @@ public class Router extends CompositeService {
   protected void serviceInit(Configuration configuration) throws Exception {
     this.conf = configuration;
 
-    // TODO Interface to the State Store
-    this.stateStore = null;
-
     // Resolver to track active NNs
     this.namenodeResolver = newActiveNamenodeResolver(
         this.conf, this.stateStore);
@@ -122,6 +124,15 @@ public class Router extends CompositeService {
       throw new IOException("Cannot find subcluster resolver");
     }
 
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_ROUTER_RPC_ENABLE,
+        DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
+      // Create RPC server
+      this.rpcServer = createRpcServer();
+      addService(this.rpcServer);
+      this.setRpcServerAddress(rpcServer.getRpcAddress());
+    }
+
     super.serviceInit(conf);
   }
 
@@ -171,11 +182,13 @@ public class Router extends CompositeService {
       router.init(conf);
       router.start();
     } catch (Throwable e) {
-      LOG.error("Failed to start router.", e);
+      LOG.error("Failed to start router", e);
       terminate(1, e);
     }
   }
 
+
+
   /////////////////////////////////////////////////////////
   // RPC Server
   /////////////////////////////////////////////////////////
@@ -183,7 +196,7 @@ public class Router extends CompositeService {
   /**
    * Create a new Router RPC server to proxy ClientProtocol requests.
    *
-   * @return RouterRpcServer
+   * @return New Router RPC Server.
    * @throws IOException If the router RPC server was not started.
    */
   protected RouterRpcServer createRpcServer() throws IOException {
@@ -200,6 +213,35 @@ public class Router extends CompositeService {
     return this.rpcServer;
   }
 
+  /**
+   * Set the current RPC socket for the router.
+   *
+   * @param rpcAddress RPC address.
+   */
+  protected void setRpcServerAddress(InetSocketAddress address) {
+    this.rpcAddress = address;
+
+    // Use the RPC address as our unique router Id
+    if (this.rpcAddress != null) {
+      try {
+        String hostname = InetAddress.getLocalHost().getHostName();
+        setRouterId(hostname + ":" + this.rpcAddress.getPort());
+      } catch (UnknownHostException ex) {
+        LOG.error("Cannot set unique router ID, address not resolvable {}",
+            this.rpcAddress);
+      }
+    }
+  }
+
+  /**
+   * Get the current RPC socket address for the router.
+   *
+   * @return InetSocketAddress
+   */
+  public InetSocketAddress getRpcServerAddress() {
+    return this.rpcAddress;
+  }
+
   /////////////////////////////////////////////////////////
   // Submodule getters
   /////////////////////////////////////////////////////////


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to