http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
new file mode 100644
index 0000000..a1adf77
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link Router} periodically checks the state of a Namenode (usually on
+ * the same server) and reports their high availability (HA) state and
+ * load/space status to the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
+ * . Note that this is an optional role as a Router can be independent of any
+ * subcluster.
+ * <p>
+ * For performance with Namenode HA, the Router uses the high availability 
state
+ * information in the State Store to forward the request to the Namenode that 
is
+ * most likely to be active.
+ * <p>
+ * Note that this service can be embedded into the Namenode itself to simplify
+ * the operation.
+ */
+public class NamenodeHeartbeatService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NamenodeHeartbeatService.class);
+
+
+  /** Configuration for the heartbeat. */
+  private Configuration conf;
+
+  /** Router performing the heartbeating. */
+  private final ActiveNamenodeResolver resolver;
+
+  /** Interface to the tracked NN. */
+  private final String nameserviceId;
+  private final String namenodeId;
+
+  /** Namenode HA target. */
+  private NNHAServiceTarget localTarget;
+  /** RPC address for the namenode. */
+  private String rpcAddress;
+  /** Service RPC address for the namenode. */
+  private String serviceAddress;
+  /** Service RPC address for the namenode. */
+  private String lifelineAddress;
+  /** HTTP address for the namenode. */
+  private String webAddress;
+
+  /**
+   * Create a new Namenode status updater.
+   * @param resolver Namenode resolver service to handle NN registration.
+   * @param nsId Identifier of the nameservice.
+   * @param nnId Identifier of the namenode in HA.
+   */
+  public NamenodeHeartbeatService(
+      ActiveNamenodeResolver resolver, String nsId, String nnId) {
+    super(NamenodeHeartbeatService.class.getSimpleName() +
+        (nsId == null ? "" : " " + nsId) +
+        (nnId == null ? "" : " " + nnId));
+
+    this.resolver = resolver;
+
+    this.nameserviceId = nsId;
+    this.namenodeId = nnId;
+
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+
+    this.conf = configuration;
+
+    String nnDesc = nameserviceId;
+    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
+      this.localTarget = new NNHAServiceTarget(
+          conf, nameserviceId, namenodeId);
+      nnDesc += "-" + namenodeId;
+    } else {
+      this.localTarget = null;
+    }
+
+    // Get the RPC address for the clients to connect
+    this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
+    LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
+
+    // Get the Service RPC address for monitoring
+    this.serviceAddress =
+        DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
+    if (this.serviceAddress == null) {
+      LOG.error("Cannot locate RPC service address for NN {}, " +
+          "using RPC address {}", nnDesc, this.rpcAddress);
+      this.serviceAddress = this.rpcAddress;
+    }
+    LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
+
+    // Get the Lifeline RPC address for faster monitoring
+    this.lifelineAddress =
+        DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
+    if (this.lifelineAddress == null) {
+      this.lifelineAddress = this.serviceAddress;
+    }
+    LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
+
+    // Get the Web address for UI
+    this.webAddress =
+        DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
+    LOG.info("{} Web address: {}", nnDesc, webAddress);
+
+    this.setIntervalMs(conf.getLong(
+        DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
+        DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
+
+
+    super.serviceInit(configuration);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    updateState();
+  }
+
+  /**
+   * Get the RPC address for a Namenode.
+   * @param conf Configuration.
+   * @param nsId Name service identifier.
+   * @param nnId Name node identifier.
+   * @return RPC address in format hostname:1234.
+   */
+  private static String getRpcAddress(
+      Configuration conf, String nsId, String nnId) {
+
+    // Get it from the regular RPC setting
+    String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+    String ret = conf.get(confKey);
+
+    if (nsId != null || nnId != null) {
+      // Get if for the proper nameservice and namenode
+      confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
+      ret = conf.get(confKey);
+
+      // If not available, get it from the map
+      if (ret == null) {
+        Map<String, InetSocketAddress> rpcAddresses =
+            DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
+        InetSocketAddress sockAddr = null;
+        if (nnId != null) {
+          sockAddr = rpcAddresses.get(nnId);
+        } else if (rpcAddresses.size() == 1) {
+          // Get the only namenode in the namespace
+          sockAddr = rpcAddresses.values().iterator().next();
+        }
+        if (sockAddr != null) {
+          InetAddress addr = sockAddr.getAddress();
+          ret = addr.getHostName() + ":" + sockAddr.getPort();
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Update the state of the Namenode.
+   */
+  private void updateState() {
+    NamenodeStatusReport report = getNamenodeStatusReport();
+    if (!report.registrationValid()) {
+      // Not operational
+      LOG.error("Namenode is not operational: {}", getNamenodeDesc());
+    } else if (report.haStateValid()) {
+      // block and HA status available
+      LOG.debug("Received service state: {} from HA namenode: {}",
+          report.getState(), getNamenodeDesc());
+    } else if (localTarget == null) {
+      // block info available, HA status not expected
+      LOG.debug(
+          "Reporting non-HA namenode as operational: " + getNamenodeDesc());
+    } else {
+      // block info available, HA status should be available, but was not
+      // fetched do nothing and let the current state stand
+      return;
+    }
+    try {
+      if (!resolver.registerNamenode(report)) {
+        LOG.warn("Cannot register namenode {}", report);
+      }
+    } catch (IOException e) {
+      LOG.info("Cannot register namenode in the State Store");
+    } catch (Exception ex) {
+      LOG.error("Unhandled exception updating NN registration for {}",
+          getNamenodeDesc(), ex);
+    }
+  }
+
+  /**
+   * Get the status report for the Namenode monitored by this heartbeater.
+   * @return Namenode status report.
+   */
+  protected NamenodeStatusReport getNamenodeStatusReport() {
+    NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
+        namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
+
+    try {
+      LOG.debug("Probing NN at service address: {}", serviceAddress);
+
+      URI serviceURI = new URI("hdfs://" + serviceAddress);
+      // Read the filesystem info from RPC (required)
+      NamenodeProtocol nn = NameNodeProxies
+          .createProxy(this.conf, serviceURI, NamenodeProtocol.class)
+          .getProxy();
+
+      if (nn != null) {
+        NamespaceInfo info = nn.versionRequest();
+        if (info != null) {
+          report.setNamespaceInfo(info);
+        }
+      }
+      if (!report.registrationValid()) {
+        return report;
+      }
+
+      // Check for safemode from the client protocol. Currently optional, but
+      // should be required at some point for QoS
+      try {
+        ClientProtocol client = NameNodeProxies
+            .createProxy(this.conf, serviceURI, ClientProtocol.class)
+            .getProxy();
+        if (client != null) {
+          boolean isSafeMode = client.setSafeMode(
+              SafeModeAction.SAFEMODE_GET, false);
+          report.setSafeMode(isSafeMode);
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
+      }
+
+      // Read the stats from JMX (optional)
+      updateJMXParameters(webAddress, report);
+
+      if (localTarget != null) {
+        // Try to get the HA status
+        try {
+          // Determine if NN is active
+          // TODO: dynamic timeout
+          HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000);
+          HAServiceStatus status = haProtocol.getServiceStatus();
+          report.setHAServiceState(status.getState());
+        } catch (Throwable e) {
+          if (e.getMessage().startsWith("HA for namenode is not enabled")) {
+            LOG.error("HA for {} is not enabled", getNamenodeDesc());
+            localTarget = null;
+          } else {
+            // Failed to fetch HA status, ignoring failure
+            LOG.error("Cannot fetch HA status for {}: {}",
+                getNamenodeDesc(), e.getMessage(), e);
+          }
+        }
+      }
+    } catch(IOException e) {
+      LOG.error("Cannot communicate with {}: {}",
+          getNamenodeDesc(), e.getMessage());
+    } catch(Throwable e) {
+      // Generic error that we don't know about
+      LOG.error("Unexpected exception while communicating with {}: {}",
+          getNamenodeDesc(), e.getMessage(), e);
+    }
+    return report;
+  }
+
+  /**
+   * Get the description of the Namenode to monitor.
+   * @return Description of the Namenode to monitor.
+   */
+  public String getNamenodeDesc() {
+    if (namenodeId != null && !namenodeId.isEmpty()) {
+      return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
+    } else {
+      return nameserviceId + ":" + serviceAddress;
+    }
+  }
+
+  /**
+   * Get the parameters for a Namenode from JMX and add them to the report.
+   * @param address Web interface of the Namenode to monitor.
+   * @param report Namenode status report to update with JMX data.
+   */
+  private void updateJMXParameters(
+      String address, NamenodeStatusReport report) {
+    try {
+      // TODO part of this should be moved to its own utility
+      String query = "Hadoop:service=NameNode,name=FSNamesystem*";
+      JSONArray aux = FederationUtil.getJmx(query, address);
+      if (aux != null) {
+        for (int i = 0; i < aux.length(); i++) {
+          JSONObject jsonObject = aux.getJSONObject(i);
+          String name = jsonObject.getString("name");
+          if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
+            report.setDatanodeInfo(
+                jsonObject.getInt("NumLiveDataNodes"),
+                jsonObject.getInt("NumDeadDataNodes"),
+                jsonObject.getInt("NumDecommissioningDataNodes"),
+                jsonObject.getInt("NumDecomLiveDataNodes"),
+                jsonObject.getInt("NumDecomDeadDataNodes"));
+          } else if (name.equals(
+              "Hadoop:service=NameNode,name=FSNamesystem")) {
+            report.setNamesystemInfo(
+                jsonObject.getLong("CapacityRemaining"),
+                jsonObject.getLong("CapacityTotal"),
+                jsonObject.getLong("FilesTotal"),
+                jsonObject.getLong("BlocksTotal"),
+                jsonObject.getLong("MissingBlocks"),
+                jsonObject.getLong("PendingReplicationBlocks"),
+                jsonObject.getLong("UnderReplicatedBlocks"),
+                jsonObject.getLong("PendingDeletionBlocks"),
+                jsonObject.getLong("ProvidedCapacityTotal"));
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
new file mode 100644
index 0000000..5e12222
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Service to periodically execute a runnable.
+ */
+public abstract class PeriodicService extends AbstractService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PeriodicService.class);
+
+  /** Default interval in milliseconds for the periodic service. */
+  private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
+
+
+  /** Interval for running the periodic service in milliseconds. */
+  private long intervalMs;
+  /** Name of the service. */
+  private final String serviceName;
+
+  /** Scheduler for the periodic service. */
+  private final ScheduledExecutorService scheduler;
+
+  /** If the service is running. */
+  private volatile boolean isRunning = false;
+
+  /** How many times we run. */
+  private long runCount;
+  /** How many errors we got. */
+  private long errorCount;
+  /** When was the last time we executed this service successfully. */
+  private long lastRun;
+
+  /**
+   * Create a new periodic update service.
+   *
+   * @param name Name of the service.
+   */
+  public PeriodicService(String name) {
+    this(name, DEFAULT_INTERVAL_MS);
+  }
+
+  /**
+   * Create a new periodic update service.
+   *
+   * @param name Name of the service.
+   * @param interval Interval for the periodic service in milliseconds.
+   */
+  public PeriodicService(String name, long interval) {
+    super(name);
+    this.serviceName = name;
+    this.intervalMs = interval;
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat(this.getName() + "-%d")
+        .build();
+    this.scheduler = Executors.newScheduledThreadPool(1, threadFactory);
+  }
+
+  /**
+   * Set the interval for the periodic service.
+   *
+   * @param interval Interval in milliseconds.
+   */
+  protected void setIntervalMs(long interval) {
+    if (getServiceState() == STATE.STARTED) {
+      throw new ServiceStateException("Periodic service already started");
+    } else {
+      this.intervalMs = interval;
+    }
+  }
+
+  /**
+   * Get the interval for the periodic service.
+   *
+   * @return Interval in milliseconds.
+   */
+  protected long getIntervalMs() {
+    return this.intervalMs;
+  }
+
+  /**
+   * Get how many times we failed to run the periodic service.
+   *
+   * @return Times we failed to run the periodic service.
+   */
+  protected long getErrorCount() {
+    return this.errorCount;
+  }
+
+  /**
+   * Get how many times we run the periodic service.
+   *
+   * @return Times we run the periodic service.
+   */
+  protected long getRunCount() {
+    return this.runCount;
+  }
+
+  /**
+   * Get the last time the periodic service was executed.
+   *
+   * @return Last time the periodic service was executed.
+   */
+  protected long getLastUpdate() {
+    return this.lastRun;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    LOG.info("Starting periodic service {}", this.serviceName);
+    startPeriodic();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopPeriodic();
+    LOG.info("Stopping periodic service {}", this.serviceName);
+    super.serviceStop();
+  }
+
+  /**
+   * Stop the periodic task.
+   */
+  protected synchronized void stopPeriodic() {
+    if (this.isRunning) {
+      LOG.info("{} is shutting down", this.serviceName);
+      this.isRunning = false;
+      this.scheduler.shutdownNow();
+    }
+  }
+
+  /**
+   * Start the periodic execution.
+   */
+  protected synchronized void startPeriodic() {
+    stopPeriodic();
+
+    // Create the runnable service
+    Runnable updateRunnable = new Runnable() {
+      @Override
+      public void run() {
+        LOG.debug("Running {} update task", serviceName);
+        try {
+          if (!isRunning) {
+            return;
+          }
+          periodicInvoke();
+          runCount++;
+          lastRun = Time.now();
+        } catch (Exception ex) {
+          errorCount++;
+          LOG.warn(serviceName + " service threw an exception", ex);
+        }
+      }
+    };
+
+    // Start the execution of the periodic service
+    this.isRunning = true;
+    this.scheduler.scheduleWithFixedDelay(
+        updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Method that the service will run periodically.
+   */
+  protected abstract void periodicInvoke();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
new file mode 100644
index 0000000..dbb6ffa
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Module that implements the quota relevant RPC calls
+ * {@link ClientProtocol#setQuota(String, long, long, StorageType)}
+ * and
+ * {@link ClientProtocol#getQuotaUsage(String)}
+ * in the {@link RouterRpcServer}.
+ */
+public class Quota {
+  private static final Logger LOG = LoggerFactory.getLogger(Quota.class);
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Router used in RouterRpcServer. */
+  private final Router router;
+
+  public Quota(Router router, RouterRpcServer server) {
+    this.router = router;
+    this.rpcServer = server;
+    this.rpcClient = server.getRPCClient();
+  }
+
+  /**
+   * Set quota for the federation path.
+   * @param path Federation path.
+   * @param namespaceQuota Name space quota.
+   * @param storagespaceQuota Storage space quota.
+   * @param type StorageType that the space quota is intended to be set on.
+   * @throws IOException
+   */
+  public void setQuota(String path, long namespaceQuota,
+      long storagespaceQuota, StorageType type) throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE);
+
+    // Set quota for current path and its children mount table path.
+    final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+    if (LOG.isDebugEnabled()) {
+      for (RemoteLocation loc : locations) {
+        LOG.debug("Set quota for path: nsId: {}, dest: {}.",
+            loc.getNameserviceId(), loc.getDest());
+      }
+    }
+
+    RemoteMethod method = new RemoteMethod("setQuota",
+        new Class<?>[] {String.class, long.class, long.class,
+            StorageType.class},
+        new RemoteParam(), namespaceQuota, storagespaceQuota, type);
+    rpcClient.invokeConcurrent(locations, method, false, false);
+  }
+
+  /**
+   * Get quota usage for the federation path.
+   * @param path Federation path.
+   * @return Aggregated quota.
+   * @throws IOException
+   */
+  public QuotaUsage getQuotaUsage(String path) throws IOException {
+    final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
+    RemoteMethod method = new RemoteMethod("getQuotaUsage",
+        new Class<?>[] {String.class}, new RemoteParam());
+    Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent(
+        quotaLocs, method, true, false, QuotaUsage.class);
+
+    return aggregateQuota(results);
+  }
+
+  /**
+   * Get valid quota remote locations used in {@link #getQuotaUsage(String)}.
+   * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this
+   * method will do some additional filtering.
+   * @param path Federation path.
+   * @return List of valid quota remote locations.
+   * @throws IOException
+   */
+  private List<RemoteLocation> getValidQuotaLocations(String path)
+      throws IOException {
+    final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+
+    // NameService -> Locations
+    Map<String, List<RemoteLocation>> validLocations = new HashMap<>();
+    for (RemoteLocation loc : locations) {
+      String nsId = loc.getNameserviceId();
+      List<RemoteLocation> dests = validLocations.get(nsId);
+      if (dests == null) {
+        dests = new LinkedList<>();
+        dests.add(loc);
+        validLocations.put(nsId, dests);
+      } else {
+        // Ensure the paths in the same nameservice is different.
+        // Don't include parent-child paths.
+        boolean isChildPath = false;
+        for (RemoteLocation d : dests) {
+          if (loc.getDest().startsWith(d.getDest())) {
+            isChildPath = true;
+            break;
+          }
+        }
+
+        if (!isChildPath) {
+          dests.add(loc);
+        }
+      }
+    }
+
+    List<RemoteLocation> quotaLocs = new LinkedList<>();
+    for (List<RemoteLocation> locs : validLocations.values()) {
+      quotaLocs.addAll(locs);
+    }
+
+    return quotaLocs;
+  }
+
+  /**
+   * Aggregate quota that queried from sub-clusters.
+   * @param results Quota query result.
+   * @return Aggregated Quota.
+   */
+  private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
+    long nsCount = 0;
+    long ssCount = 0;
+    boolean hasQuotaUnSet = false;
+
+    for (Map.Entry<RemoteLocation, QuotaUsage> entry : results.entrySet()) {
+      RemoteLocation loc = entry.getKey();
+      QuotaUsage usage = entry.getValue();
+      if (usage != null) {
+        // If quota is not set in real FileSystem, the usage
+        // value will return -1.
+        if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) {
+          hasQuotaUnSet = true;
+        }
+
+        nsCount += usage.getFileAndDirectoryCount();
+        ssCount += usage.getSpaceConsumed();
+        LOG.debug(
+            "Get quota usage for path: nsId: {}, dest: {},"
+                + " nsCount: {}, ssCount: {}.",
+            loc.getNameserviceId(), loc.getDest(),
+            usage.getFileAndDirectoryCount(), usage.getSpaceConsumed());
+      }
+    }
+
+    QuotaUsage.Builder builder = new QuotaUsage.Builder()
+        .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount);
+    if (hasQuotaUnSet) {
+      builder.quota(HdfsConstants.QUOTA_DONT_SET);
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Get all quota remote locations across subclusters under given
+   * federation path.
+   * @param path Federation path.
+   * @return List of quota remote locations.
+   * @throws IOException
+   */
+  private List<RemoteLocation> getQuotaRemoteLocations(String path)
+      throws IOException {
+    List<RemoteLocation> locations = new LinkedList<>();
+    RouterQuotaManager manager = this.router.getQuotaManager();
+    if (manager != null) {
+      Set<String> childrenPaths = manager.getPaths(path);
+      for (String childPath : childrenPaths) {
+        locations.addAll(rpcServer.getLocationsForPath(childPath, true));
+      }
+    }
+
+    return locations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
new file mode 100644
index 0000000..170b876
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import 
org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
+import 
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Config fields for router-based hdfs federation.
+ */
+@InterfaceAudience.Private
+public class RBFConfigKeys extends CommonConfigurationKeysPublic {
+
+  // HDFS Router-based federation
+  public static final String FEDERATION_ROUTER_PREFIX =
+      "dfs.federation.router.";
+  public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
+      FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
+  public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
+      FEDERATION_ROUTER_PREFIX + "handler.count";
+  public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
+      FEDERATION_ROUTER_PREFIX + "reader.queue.size";
+  public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_READER_COUNT_KEY =
+      FEDERATION_ROUTER_PREFIX + "reader.count";
+  public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
+  public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
+      FEDERATION_ROUTER_PREFIX + "handler.queue.size";
+  public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
+  public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
+      FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
+  public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888;
+  public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
+      FEDERATION_ROUTER_PREFIX + "rpc-address";
+  public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
+  public static final String DFS_ROUTER_RPC_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "rpc.enable";
+  public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+
+  public static final String DFS_ROUTER_METRICS_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "metrics.enable";
+  public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_METRICS_CLASS =
+      FEDERATION_ROUTER_PREFIX + "metrics.class";
+  public static final Class<? extends RouterRpcMonitor>
+      DFS_ROUTER_METRICS_CLASS_DEFAULT =
+      FederationRPCPerformanceMonitor.class;
+
+  // HDFS Router heartbeat
+  public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
+  public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
+  public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
+  public static final String DFS_ROUTER_MONITOR_NAMENODE =
+      FEDERATION_ROUTER_PREFIX + "monitor.namenode";
+  public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
+      FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
+  public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
+  public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS =
+      FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
+  public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
+
+  // HDFS Router NN client
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
+      FEDERATION_ROUTER_PREFIX + "connection.pool-size";
+  public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
+      64;
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
+      FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
+  public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+  public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
+      FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
+  public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
+
+  // HDFS Router RPC client
+  public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
+      FEDERATION_ROUTER_PREFIX + "client.thread-size";
+  public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
+  public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
+      FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
+  public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
+
+  // HDFS Router State Store connection
+  public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
+      FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";
+  public static final Class<? extends FileSubclusterResolver>
+      FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT =
+      MountTableResolver.class;
+  public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS =
+      FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class";
+  public static final Class<? extends ActiveNamenodeResolver>
+      FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
+      MembershipNamenodeResolver.class;
+
+  // HDFS Router-based federation State Store
+  public static final String FEDERATION_STORE_PREFIX =
+      FEDERATION_ROUTER_PREFIX + "store.";
+
+  public static final String DFS_ROUTER_STORE_ENABLE =
+      FEDERATION_STORE_PREFIX + "enable";
+  public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true;
+
+  public static final String FEDERATION_STORE_SERIALIZER_CLASS =
+      FEDERATION_STORE_PREFIX + "serializer";
+  public static final Class<StateStoreSerializerPBImpl>
+      FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT =
+      StateStoreSerializerPBImpl.class;
+
+  public static final String FEDERATION_STORE_DRIVER_CLASS =
+      FEDERATION_STORE_PREFIX + "driver.class";
+  public static final Class<? extends StateStoreDriver>
+      FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class;
+
+  public static final String FEDERATION_STORE_CONNECTION_TEST_MS =
+      FEDERATION_STORE_PREFIX + "connection.test";
+  public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+
+  public static final String DFS_ROUTER_CACHE_TIME_TO_LIVE_MS =
+      FEDERATION_ROUTER_PREFIX + "cache.ttl";
+  public static final long DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+
+  public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS =
+      FEDERATION_STORE_PREFIX + "membership.expiration";
+  public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(5);
+  public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
+      FEDERATION_STORE_PREFIX + "router.expiration";
+  public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(5);
+
+  // HDFS Router safe mode
+  public static final String DFS_ROUTER_SAFEMODE_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "safemode.enable";
+  public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_SAFEMODE_EXTENSION =
+      FEDERATION_ROUTER_PREFIX + "safemode.extension";
+  public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT =
+      TimeUnit.SECONDS.toMillis(30);
+  public static final String DFS_ROUTER_SAFEMODE_EXPIRATION =
+      FEDERATION_ROUTER_PREFIX + "safemode.expiration";
+  public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT =
+      3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT;
+
+  // HDFS Router-based federation mount table entries
+  /** Maximum number of cache entries to have. */
+  public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE =
+      FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size";
+  /** Remove cache entries if we have more than 10k. */
+  public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 
10000;
+
+  // HDFS Router-based federation admin
+  public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY =
+      FEDERATION_ROUTER_PREFIX + "admin.handler.count";
+  public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1;
+  public static final int    DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111;
+  public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY =
+      FEDERATION_ROUTER_PREFIX + "admin-address";
+  public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT;
+  public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY =
+      FEDERATION_ROUTER_PREFIX + "admin-bind-host";
+  public static final String DFS_ROUTER_ADMIN_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "admin.enable";
+  public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true;
+
+  // HDFS Router-based federation web
+  public static final String DFS_ROUTER_HTTP_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "http.enable";
+  public static final boolean DFS_ROUTER_HTTP_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_HTTP_ADDRESS_KEY =
+      FEDERATION_ROUTER_PREFIX + "http-address";
+  public static final int    DFS_ROUTER_HTTP_PORT_DEFAULT = 50071;
+  public static final String DFS_ROUTER_HTTP_BIND_HOST_KEY =
+      FEDERATION_ROUTER_PREFIX + "http-bind-host";
+  public static final String DFS_ROUTER_HTTP_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_ROUTER_HTTP_PORT_DEFAULT;
+  public static final String DFS_ROUTER_HTTPS_ADDRESS_KEY =
+      FEDERATION_ROUTER_PREFIX + "https-address";
+  public static final int    DFS_ROUTER_HTTPS_PORT_DEFAULT = 50072;
+  public static final String DFS_ROUTER_HTTPS_BIND_HOST_KEY =
+      FEDERATION_ROUTER_PREFIX + "https-bind-host";
+  public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT =
+      "0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT;
+
+  // HDFS Router-based federation quota
+  public static final String DFS_ROUTER_QUOTA_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "quota.enable";
+  public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false;
+  public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL =
+      FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval";
+  public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT =
+      60000;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
new file mode 100644
index 0000000..a90c460
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Base class for objects that are unique to a namespace.
+ */
+public abstract class RemoteLocationContext
+    implements Comparable<RemoteLocationContext> {
+
+  /**
+   * Returns an identifier for a unique namespace.
+   *
+   * @return Namespace identifier.
+   */
+  public abstract String getNameserviceId();
+
+  /**
+   * Destination in this location. For example the path in a remote namespace.
+   *
+   * @return Destination in this location.
+   */
+  public abstract String getDest();
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 31)
+        .append(getNameserviceId())
+        .append(getDest())
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof RemoteLocationContext) {
+      RemoteLocationContext other = (RemoteLocationContext) obj;
+      return this.getNameserviceId().equals(other.getNameserviceId()) &&
+          this.getDest().equals(other.getDest());
+    }
+    return false;
+  }
+
+  @Override
+  public int compareTo(RemoteLocationContext info) {
+    int ret = this.getNameserviceId().compareTo(info.getNameserviceId());
+    if (ret == 0) {
+      ret = this.getDest().compareTo(info.getDest());
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
new file mode 100644
index 0000000..cd57d45
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Determines the remote client protocol method and the parameter list for a
+ * specific location.
+ */
+public class RemoteMethod {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteMethod.class);
+
+
+  /** List of parameters: static and dynamic values, matchings types. */
+  private final Object[] params;
+  /** List of method parameters types, matches parameters. */
+  private final Class<?>[] types;
+  /** String name of the ClientProtocol method. */
+  private final String methodName;
+
+  /**
+   * Create a method with no parameters.
+   *
+   * @param method The string name of the ClientProtocol method.
+   */
+  public RemoteMethod(String method) {
+    this.params = null;
+    this.types = null;
+    this.methodName = method;
+  }
+
+  /**
+   * Creates a remote method generator.
+   *
+   * @param method The string name of the ClientProtocol method.
+   * @param pTypes A list of types to use to locate the specific method.
+   * @param pParams A list of parameters for the method. The order of the
+   *          parameter list must match the order and number of the types.
+   *          Parameters are grouped into 2 categories:
+   *          <ul>
+   *          <li>Static parameters that are immutable across locations.
+   *          <li>Dynamic parameters that are determined for each location by a
+   *          RemoteParam object. To specify a dynamic parameter, pass an
+   *          instance of RemoteParam in place of the parameter value.
+   *          </ul>
+   * @throws IOException If the types and parameter lists are not valid.
+   */
+  public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
+      throws IOException {
+
+    if (pParams.length != pTypes.length) {
+      throw new IOException("Invalid parameters for method " + method);
+    }
+
+    this.params = pParams;
+    this.types = pTypes;
+    this.methodName = method;
+  }
+
+  /**
+   * Get the represented java method.
+   *
+   * @return Method
+   * @throws IOException If the method cannot be found.
+   */
+  public Method getMethod() throws IOException {
+    try {
+      if (types != null) {
+        return ClientProtocol.class.getDeclaredMethod(methodName, types);
+      } else {
+        return ClientProtocol.class.getDeclaredMethod(methodName);
+      }
+    } catch (NoSuchMethodException e) {
+      // Re-throw as an IOException
+      LOG.error("Cannot get method {} with types {}",
+          methodName, Arrays.toString(types), e);
+      throw new IOException(e);
+    } catch (SecurityException e) {
+      LOG.error("Cannot access method {} with types {}",
+          methodName, Arrays.toString(types), e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Get the calling types for this method.
+   *
+   * @return An array of calling types.
+   */
+  public Class<?>[] getTypes() {
+    return this.types;
+  }
+
+  /**
+   * Generate a list of parameters for this specific location using no context.
+   *
+   * @return A list of parameters for the method customized for the location.
+   */
+  public Object[] getParams() {
+    return this.getParams(null);
+  }
+
+  /**
+   * Get the name of the method.
+   *
+   * @return Name of the method.
+   */
+  public String getMethodName() {
+    return this.methodName;
+  }
+
+  /**
+   * Generate a list of parameters for this specific location. Parameters are
+   * grouped into 2 categories:
+   * <ul>
+   * <li>Static parameters that are immutable across locations.
+   * <li>Dynamic parameters that are determined for each location by a
+   * RemoteParam object.
+   * </ul>
+   *
+   * @param context The context identifying the location.
+   * @return A list of parameters for the method customized for the location.
+   */
+  public Object[] getParams(RemoteLocationContext context) {
+    if (this.params == null) {
+      return new Object[] {};
+    }
+    Object[] objList = new Object[this.params.length];
+    for (int i = 0; i < this.params.length; i++) {
+      Object currentObj = this.params[i];
+      if (currentObj instanceof RemoteParam) {
+        // Map the parameter using the context
+        RemoteParam paramGetter = (RemoteParam) currentObj;
+        objList[i] = paramGetter.getParameterForContext(context);
+      } else {
+        objList[i] = currentObj;
+      }
+    }
+    return objList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
new file mode 100644
index 0000000..8816ff6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.Map;
+
+/**
+ * A dynamically assignable parameter that is location-specific.
+ * <p>
+ * There are 2 ways this mapping is determined:
+ * <ul>
+ * <li>Default: Uses the RemoteLocationContext's destination
+ * <li>Map: Uses the value of the RemoteLocationContext key provided in the
+ * parameter map.
+ * </ul>
+ */
+public class RemoteParam {
+
+  private final Map<? extends Object, ? extends Object> paramMap;
+
+  /**
+   * Constructs a default remote parameter. Always maps the value to the
+   * destination of the provided RemoveLocationContext.
+   */
+  public RemoteParam() {
+    this.paramMap = null;
+  }
+
+  /**
+   * Constructs a map based remote parameter. Determines the value using the
+   * provided RemoteLocationContext as a key into the map.
+   *
+   * @param map Map with RemoteLocationContext keys.
+   */
+  public RemoteParam(
+      Map<? extends RemoteLocationContext, ? extends Object> map) {
+    this.paramMap = map;
+  }
+
+  /**
+   * Determine the appropriate value for this parameter based on the location.
+   *
+   * @param context Context identifying the location.
+   * @return A parameter specific to this location.
+   */
+  public Object getParameterForContext(RemoteLocationContext context) {
+    if (context == null) {
+      return null;
+    } else if (this.paramMap != null) {
+      return this.paramMap.get(context);
+    } else {
+      // Default case
+      return context.getDest();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
new file mode 100644
index 0000000..38f5d4f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver;
+import static 
org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Router that provides a unified view of multiple federated HDFS clusters. It
+ * has two main roles: (1) federated interface and (2) NameNode heartbeat.
+ * <p>
+ * For the federated interface, the Router receives a client request, checks 
the
+ * State Store for the correct subcluster, and forwards the request to the
+ * active Namenode of that subcluster. The reply from the Namenode then flows 
in
+ * the opposite direction. The Routers are stateless and can be behind a load
+ * balancer. HDFS clients connect to the router using the same interfaces as 
are
+ * used to communicate with a namenode, namely the ClientProtocol RPC interface
+ * and the WebHdfs HTTP interface exposed by the router. {@link 
RouterRpcServer}
+ * {@link RouterHttpServer}
+ * <p>
+ * For NameNode heartbeat, the Router periodically checks the state of a
+ * NameNode (usually on the same server) and reports their high availability
+ * (HA) state and load/space status to the State Store. Note that this is an
+ * optional role as a Router can be independent of any subcluster.
+ * {@link StateStoreService} {@link NamenodeHeartbeatService}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class Router extends CompositeService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Router.class);
+
+
+  /** Configuration for the Router. */
+  private Configuration conf;
+
+  /** Router address/identifier. */
+  private String routerId;
+
+  /** RPC interface to the client. */
+  private RouterRpcServer rpcServer;
+  private InetSocketAddress rpcAddress;
+
+  /** RPC interface for the admin. */
+  private RouterAdminServer adminServer;
+  private InetSocketAddress adminAddress;
+
+  /** HTTP interface and web application. */
+  private RouterHttpServer httpServer;
+
+  /** Interface with the State Store. */
+  private StateStoreService stateStore;
+
+  /** Interface to map global name space to HDFS subcluster name spaces. */
+  private FileSubclusterResolver subclusterResolver;
+
+  /** Interface to identify the active NN for a nameservice or blockpool ID. */
+  private ActiveNamenodeResolver namenodeResolver;
+  /** Updates the namenode status in the namenode resolver. */
+  private Collection<NamenodeHeartbeatService> namenodeHeartbeatServices;
+
+  /** Router metrics. */
+  private RouterMetricsService metrics;
+
+  /** JVM pauses (GC and others). */
+  private JvmPauseMonitor pauseMonitor;
+
+  /** Quota usage update service. */
+  private RouterQuotaUpdateService quotaUpdateService;
+  /** Quota cache manager. */
+  private RouterQuotaManager quotaManager;
+
+  /** Manages the current state of the router. */
+  private RouterStore routerStateManager;
+  /** Heartbeat our run status to the router state manager. */
+  private RouterHeartbeatService routerHeartbeatService;
+  /** Enter/exit safemode. */
+  private RouterSafemodeService safemodeService;
+
+  /** The start time of the namesystem. */
+  private final long startTime = Time.now();
+
+  /** State of the Router. */
+  private RouterServiceState state = RouterServiceState.UNINITIALIZED;
+
+
+  /////////////////////////////////////////////////////////
+  // Constructor
+  /////////////////////////////////////////////////////////
+
+  public Router() {
+    super(Router.class.getName());
+  }
+
+  /////////////////////////////////////////////////////////
+  // Service management
+  /////////////////////////////////////////////////////////
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+    this.conf = configuration;
+    updateRouterState(RouterServiceState.INITIALIZING);
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_STORE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) {
+      // Service that maintains the State Store connection
+      this.stateStore = new StateStoreService();
+      addService(this.stateStore);
+    }
+
+    // Resolver to track active NNs
+    this.namenodeResolver = newActiveNamenodeResolver(
+        this.conf, this.stateStore);
+    if (this.namenodeResolver == null) {
+      throw new IOException("Cannot find namenode resolver.");
+    }
+
+    // Lookup interface to map between the global and subcluster name spaces
+    this.subclusterResolver = newFileSubclusterResolver(this.conf, this);
+    if (this.subclusterResolver == null) {
+      throw new IOException("Cannot find subcluster resolver");
+    }
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_RPC_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
+      // Create RPC server
+      this.rpcServer = createRpcServer();
+      addService(this.rpcServer);
+      this.setRpcServerAddress(rpcServer.getRpcAddress());
+    }
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) {
+      // Create admin server
+      this.adminServer = createAdminServer();
+      addService(this.adminServer);
+    }
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE_DEFAULT)) {
+      // Create HTTP server
+      this.httpServer = createHttpServer();
+      addService(this.httpServer);
+    }
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
+
+      // Create status updater for each monitored Namenode
+      this.namenodeHeartbeatServices = createNamenodeHeartbeatServices();
+      for (NamenodeHeartbeatService hearbeatService :
+          this.namenodeHeartbeatServices) {
+        addService(hearbeatService);
+      }
+
+      if (this.namenodeHeartbeatServices.isEmpty()) {
+        LOG.error("Heartbeat is enabled but there are no namenodes to 
monitor");
+      }
+
+      // Periodically update the router state
+      this.routerHeartbeatService = new RouterHeartbeatService(this);
+      addService(this.routerHeartbeatService);
+    }
+
+    // Router metrics system
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) {
+
+      DefaultMetricsSystem.initialize("Router");
+
+      this.metrics = new RouterMetricsService(this);
+      addService(this.metrics);
+
+      // JVM pause monitor
+      this.pauseMonitor = new JvmPauseMonitor();
+      this.pauseMonitor.init(conf);
+    }
+
+    // Initial quota relevant service
+    if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) {
+      this.quotaManager = new RouterQuotaManager();
+      this.quotaUpdateService = new RouterQuotaUpdateService(this);
+      addService(this.quotaUpdateService);
+    }
+
+    // Safemode service to refuse RPC calls when the router is out of sync
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
+      // Create safemode monitoring service
+      this.safemodeService = new RouterSafemodeService(this);
+      addService(this.safemodeService);
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+
+    if (this.safemodeService == null) {
+      // Router is running now
+      updateRouterState(RouterServiceState.RUNNING);
+    }
+
+    if (this.pauseMonitor != null) {
+      this.pauseMonitor.start();
+      JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
+      if (jvmMetrics != null) {
+        jvmMetrics.setPauseMonitor(pauseMonitor);
+      }
+    }
+
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+
+    // Update state
+    updateRouterState(RouterServiceState.SHUTDOWN);
+
+    // JVM pause monitor
+    if (this.pauseMonitor != null) {
+      this.pauseMonitor.stop();
+    }
+
+    super.serviceStop();
+  }
+
+  /**
+   * Shutdown the router.
+   */
+  public void shutDown() {
+    new Thread() {
+      @Override
+      public void run() {
+        Router.this.stop();
+      }
+    }.start();
+  }
+
+  /////////////////////////////////////////////////////////
+  // RPC Server
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Create a new Router RPC server to proxy ClientProtocol requests.
+   *
+   * @return New Router RPC Server.
+   * @throws IOException If the router RPC server was not started.
+   */
+  protected RouterRpcServer createRpcServer() throws IOException {
+    return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(),
+        this.getSubclusterResolver());
+  }
+
+  /**
+   * Get the Router RPC server.
+   *
+   * @return Router RPC server.
+   */
+  public RouterRpcServer getRpcServer() {
+    return this.rpcServer;
+  }
+
+  /**
+   * Set the current RPC socket for the router.
+   *
+   * @param address RPC address.
+   */
+  protected void setRpcServerAddress(InetSocketAddress address) {
+    this.rpcAddress = address;
+
+    // Use the RPC address as our unique router Id
+    if (this.rpcAddress != null) {
+      try {
+        String hostname = InetAddress.getLocalHost().getHostName();
+        setRouterId(hostname + ":" + this.rpcAddress.getPort());
+      } catch (UnknownHostException ex) {
+        LOG.error("Cannot set unique router ID, address not resolvable {}",
+            this.rpcAddress);
+      }
+    }
+  }
+
+  /**
+   * Get the current RPC socket address for the router.
+   *
+   * @return InetSocketAddress
+   */
+  public InetSocketAddress getRpcServerAddress() {
+    return this.rpcAddress;
+  }
+
+  /////////////////////////////////////////////////////////
+  // Admin server
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Create a new router admin server to handle the router admin interface.
+   *
+   * @return RouterAdminServer
+   * @throws IOException If the admin server was not successfully started.
+   */
+  protected RouterAdminServer createAdminServer() throws IOException {
+    return new RouterAdminServer(this.conf, this);
+  }
+
+  /**
+   * Set the current Admin socket for the router.
+   *
+   * @param address Admin RPC address.
+   */
+  protected void setAdminServerAddress(InetSocketAddress address) {
+    this.adminAddress = address;
+  }
+
+  /**
+   * Get the current Admin socket address for the router.
+   *
+   * @return InetSocketAddress Admin address.
+   */
+  public InetSocketAddress getAdminServerAddress() {
+    return adminAddress;
+  }
+
+  /////////////////////////////////////////////////////////
+  // HTTP server
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Create an HTTP server for this Router.
+   *
+   * @return HTTP server for this Router.
+   */
+  protected RouterHttpServer createHttpServer() {
+    return new RouterHttpServer(this);
+  }
+
+  /**
+   * Get the current HTTP socket address for the router.
+   *
+   * @return InetSocketAddress HTTP address.
+   */
+  public InetSocketAddress getHttpServerAddress() {
+    if (httpServer != null) {
+      return httpServer.getHttpAddress();
+    }
+    return null;
+  }
+
+  /////////////////////////////////////////////////////////
+  // Namenode heartbeat monitors
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Create each of the services that will monitor a Namenode.
+   *
+   * @return List of heartbeat services.
+   */
+  protected Collection<NamenodeHeartbeatService>
+      createNamenodeHeartbeatServices() {
+
+    Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
+
+    if (conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
+        RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
+      // Create a local heartbet service
+      NamenodeHeartbeatService localHeartbeatService =
+          createLocalNamenodeHearbeatService();
+      if (localHeartbeatService != null) {
+        String nnDesc = localHeartbeatService.getNamenodeDesc();
+        ret.put(nnDesc, localHeartbeatService);
+      }
+    }
+
+    // Create heartbeat services for a list specified by the admin
+    String namenodes = this.conf.get(
+        RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
+    if (namenodes != null) {
+      for (String namenode : namenodes.split(",")) {
+        String[] namenodeSplit = namenode.split("\\.");
+        String nsId = null;
+        String nnId = null;
+        if (namenodeSplit.length == 2) {
+          nsId = namenodeSplit[0];
+          nnId = namenodeSplit[1];
+        } else if (namenodeSplit.length == 1) {
+          nsId = namenode;
+        } else {
+          LOG.error("Wrong Namenode to monitor: {}", namenode);
+        }
+        if (nsId != null) {
+          NamenodeHeartbeatService heartbeatService =
+              createNamenodeHearbeatService(nsId, nnId);
+          if (heartbeatService != null) {
+            ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
+          }
+        }
+      }
+    }
+
+    return ret.values();
+  }
+
+  /**
+   * Create a new status updater for the local Namenode.
+   *
+   * @return Updater of the status for the local Namenode.
+   */
+  protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
+    // Detect NN running in this machine
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    String nnId = null;
+    if (HAUtil.isHAEnabled(conf, nsId)) {
+      nnId = HAUtil.getNameNodeId(conf, nsId);
+      if (nnId == null) {
+        LOG.error("Cannot find namenode id for local {}", nsId);
+      }
+    }
+
+    return createNamenodeHearbeatService(nsId, nnId);
+  }
+
+  /**
+   * Create a heartbeat monitor for a particular Namenode.
+   *
+   * @param nsId Identifier of the nameservice to monitor.
+   * @param nnId Identifier of the namenode (HA) to monitor.
+   * @return Updater of the status for the specified Namenode.
+   */
+  protected NamenodeHeartbeatService createNamenodeHearbeatService(
+      String nsId, String nnId) {
+
+    LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
+    NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
+        namenodeResolver, nsId, nnId);
+    return ret;
+  }
+
+  /////////////////////////////////////////////////////////
+  // Router State Management
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Update the router state and heartbeat to the state store.
+   *
+   * @param state The new router state.
+   */
+  public void updateRouterState(RouterServiceState newState) {
+    this.state = newState;
+    if (this.routerHeartbeatService != null) {
+      this.routerHeartbeatService.updateStateAsync();
+    }
+  }
+
+  /**
+   * Get the status of the router.
+   *
+   * @return Status of the router.
+   */
+  public RouterServiceState getRouterState() {
+    return this.state;
+  }
+
+  /////////////////////////////////////////////////////////
+  // Submodule getters
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Get the State Store service.
+   *
+   * @return State Store service.
+   */
+  public StateStoreService getStateStore() {
+    return this.stateStore;
+  }
+
+  /**
+   * Get the metrics system for the Router.
+   *
+   * @return Router metrics.
+   */
+  public RouterMetrics getRouterMetrics() {
+    if (this.metrics != null) {
+      return this.metrics.getRouterMetrics();
+    }
+    return null;
+  }
+
+  /**
+   * Get the federation metrics.
+   *
+   * @return Federation metrics.
+   */
+  public FederationMetrics getMetrics() {
+    if (this.metrics != null) {
+      return this.metrics.getFederationMetrics();
+    }
+    return null;
+  }
+
+  /**
+   * Get the subcluster resolver for files.
+   *
+   * @return Subcluster resolver for files.
+   */
+  public FileSubclusterResolver getSubclusterResolver() {
+    return this.subclusterResolver;
+  }
+
+  /**
+   * Get the namenode resolver for a subcluster.
+   *
+   * @return The namenode resolver for a subcluster.
+   */
+  public ActiveNamenodeResolver getNamenodeResolver() {
+    return this.namenodeResolver;
+  }
+
+  /**
+   * Get the state store interface for the router heartbeats.
+   *
+   * @return FederationRouterStateStore state store API handle.
+   */
+  public RouterStore getRouterStateManager() {
+    if (this.routerStateManager == null && this.stateStore != null) {
+      this.routerStateManager = this.stateStore.getRegisteredRecordStore(
+          RouterStore.class);
+    }
+    return this.routerStateManager;
+  }
+
+  /////////////////////////////////////////////////////////
+  // Router info
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Get the start date of the Router.
+   *
+   * @return Start date of the router.
+   */
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  /**
+   * Unique ID for the router, typically the hostname:port string for the
+   * router's RPC server. This ID may be null on router startup before the RPC
+   * server has bound to a port.
+   *
+   * @return Router identifier.
+   */
+  public String getRouterId() {
+    return this.routerId;
+  }
+
+  /**
+   * Sets a unique ID for this router.
+   *
+   * @param id Identifier of the Router.
+   */
+  public void setRouterId(String id) {
+    this.routerId = id;
+    if (this.stateStore != null) {
+      this.stateStore.setIdentifier(this.routerId);
+    }
+    if (this.namenodeResolver != null) {
+      this.namenodeResolver.setRouterId(this.routerId);
+    }
+  }
+
+  /**
+   * If the quota system is enabled in Router.
+   */
+  public boolean isQuotaEnabled() {
+    return this.quotaManager != null;
+  }
+
+  /**
+   * Get route quota manager.
+   * @return RouterQuotaManager Quota manager.
+   */
+  public RouterQuotaManager getQuotaManager() {
+    return this.quotaManager;
+  }
+
+  /**
+   * Get quota cache update service.
+   */
+  @VisibleForTesting
+  RouterQuotaUpdateService getQuotaCacheUpdateService() {
+    return this.quotaUpdateService;
+  }
+
+  /**
+   * Get the list of namenode heartbeat service.
+   */
+  @VisibleForTesting
+  Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() {
+    return this.namenodeHeartbeatServices;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
new file mode 100644
index 0000000..2c195c6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import 
org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
+import 
org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+
+/**
+ * This class is responsible for handling all of the Admin calls to the HDFS
+ * router. It is created, started, and stopped by {@link Router}.
+ */
+public class RouterAdminServer extends AbstractService
+    implements MountTableManager, RouterStateManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterAdminServer.class);
+
+  private Configuration conf;
+
+  private final Router router;
+
+  private MountTableStore mountTableStore;
+
+  /** The Admin server that listens to requests from clients. */
+  private final Server adminServer;
+  private final InetSocketAddress adminAddress;
+
+  /**
+   * Permission related info used for constructing new router permission
+   * checker instance.
+   */
+  private static String routerOwner;
+  private static String superGroup;
+  private static boolean isPermissionEnabled;
+
+  public RouterAdminServer(Configuration conf, Router router)
+      throws IOException {
+    super(RouterAdminServer.class.getName());
+
+    this.conf = conf;
+    this.router = router;
+
+    int handlerCount = this.conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT);
+
+    RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator =
+        new RouterAdminProtocolServerSideTranslatorPB(this);
+    BlockingService clientNNPbService = RouterAdminProtocolService.
+        newReflectiveBlockingService(routerAdminProtocolTranslator);
+
+    InetSocketAddress confRpcAddress = conf.getSocketAddr(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT);
+
+    String bindHost = conf.get(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY,
+        confRpcAddress.getHostName());
+    LOG.info("Admin server binding to {}:{}",
+        bindHost, confRpcAddress.getPort());
+
+    initializePermissionSettings(this.conf);
+    this.adminServer = new RPC.Builder(this.conf)
+        .setProtocol(RouterAdminProtocolPB.class)
+        .setInstance(clientNNPbService)
+        .setBindAddress(bindHost)
+        .setPort(confRpcAddress.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .build();
+
+    // The RPC-server port can be ephemeral... ensure we have the correct info
+    InetSocketAddress listenAddress = this.adminServer.getListenerAddress();
+    this.adminAddress = new InetSocketAddress(
+        confRpcAddress.getHostName(), listenAddress.getPort());
+    router.setAdminServerAddress(this.adminAddress);
+  }
+
+  /**
+   * Initialize permission related settings.
+   *
+   * @param routerConf
+   * @throws IOException
+   */
+  private static void initializePermissionSettings(Configuration routerConf)
+      throws IOException {
+    routerOwner = UserGroupInformation.getCurrentUser().getShortUserName();
+    superGroup = routerConf.get(
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+    isPermissionEnabled = routerConf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
+        DFS_PERMISSIONS_ENABLED_DEFAULT);
+  }
+
+  /** Allow access to the client RPC server for testing. */
+  @VisibleForTesting
+  Server getAdminServer() {
+    return this.adminServer;
+  }
+
+  private MountTableStore getMountTableStore() throws IOException {
+    if (this.mountTableStore == null) {
+      this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
+          MountTableStore.class);
+      if (this.mountTableStore == null) {
+        throw new IOException("Mount table state store is not available.");
+      }
+    }
+    return this.mountTableStore;
+  }
+
+  /**
+   * Get the RPC address of the admin service.
+   * @return Administration service RPC address.
+   */
+  public InetSocketAddress getRpcAddress() {
+    return this.adminAddress;
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+    this.conf = configuration;
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.adminServer.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.adminServer != null) {
+      this.adminServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public AddMountTableEntryResponse addMountTableEntry(
+      AddMountTableEntryRequest request) throws IOException {
+    return getMountTableStore().addMountTableEntry(request);
+  }
+
+  @Override
+  public UpdateMountTableEntryResponse updateMountTableEntry(
+      UpdateMountTableEntryRequest request) throws IOException {
+    return getMountTableStore().updateMountTableEntry(request);
+  }
+
+  @Override
+  public RemoveMountTableEntryResponse removeMountTableEntry(
+      RemoveMountTableEntryRequest request) throws IOException {
+    return getMountTableStore().removeMountTableEntry(request);
+  }
+
+  @Override
+  public GetMountTableEntriesResponse getMountTableEntries(
+      GetMountTableEntriesRequest request) throws IOException {
+    return getMountTableStore().getMountTableEntries(request);
+  }
+
+  @Override
+  public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request)
+      throws IOException {
+    this.router.updateRouterState(RouterServiceState.SAFEMODE);
+    this.router.getRpcServer().setSafeMode(true);
+    return EnterSafeModeResponse.newInstance(verifySafeMode(true));
+  }
+
+  @Override
+  public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request)
+      throws IOException {
+    this.router.updateRouterState(RouterServiceState.RUNNING);
+    this.router.getRpcServer().setSafeMode(false);
+    return LeaveSafeModeResponse.newInstance(verifySafeMode(false));
+  }
+
+  @Override
+  public GetSafeModeResponse getSafeMode(GetSafeModeRequest request)
+      throws IOException {
+    boolean isInSafeMode = this.router.getRpcServer().isInSafeMode();
+    return GetSafeModeResponse.newInstance(isInSafeMode);
+  }
+
+  /**
+   * Verify if Router set safe mode state correctly.
+   * @param isInSafeMode Expected state to be set.
+   * @return
+   */
+  private boolean verifySafeMode(boolean isInSafeMode) {
+    boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode();
+    RouterServiceState currentState = this.router.getRouterState();
+
+    return (isInSafeMode && currentState == RouterServiceState.SAFEMODE
+        && serverInSafeMode)
+        || (!isInSafeMode && currentState != RouterServiceState.SAFEMODE
+            && !serverInSafeMode);
+  }
+
+  /**
+   * Get a new permission checker used for making mount table access
+   * control. This method will be invoked during each RPC call in router
+   * admin server.
+   *
+   * @return Router permission checker
+   * @throws AccessControlException
+   */
+  public static RouterPermissionChecker getPermissionChecker()
+      throws AccessControlException {
+    if (!isPermissionEnabled) {
+      return null;
+    }
+
+    try {
+      return new RouterPermissionChecker(routerOwner, superGroup,
+          NameNode.getRemoteUser());
+    } catch (IOException e) {
+      throw new AccessControlException(e);
+    }
+  }
+
+  /**
+   * Get super user name.
+   *
+   * @return String super user name.
+   */
+  public static String getSuperUser() {
+    return routerOwner;
+  }
+
+  /**
+   * Get super group name.
+   *
+   * @return String super group name.
+   */
+  public static String getSuperGroup(){
+    return superGroup;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to