http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
new file mode 100644
index 0000000..9788683
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -0,0 +1,1005 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service.STATE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test utility to mimic a federated HDFS cluster with multiple routers.
+ */
+public class RouterDFSCluster {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterDFSCluster.class);
+
+  public static final String TEST_STRING = "teststring";
+  public static final String TEST_DIR = "testdir";
+  public static final String TEST_FILE = "testfile";
+
+
+  /** Nameservices in the federated cluster. */
+  private List<String> nameservices;
+  /** Namenodes in the federated cluster. */
+  private List<NamenodeContext> namenodes;
+  /** Routers in the federated cluster. */
+  private List<RouterContext> routers;
+  /** If the Namenodes are in high availability.*/
+  private boolean highAvailability;
+  /** Number of datanodes per nameservice. */
+  private int numDatanodesPerNameservice = 2;
+
+  /** Mini cluster. */
+  private MiniDFSCluster cluster;
+
+  protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
+      TimeUnit.SECONDS.toMillis(5);
+  protected static final long DEFAULT_CACHE_INTERVAL_MS =
+      TimeUnit.SECONDS.toMillis(5);
+  /** Heartbeat interval in milliseconds. */
+  private long heartbeatInterval;
+  /** Cache flush interval in milliseconds. */
+  private long cacheFlushInterval;
+
+  /** Router configuration overrides. */
+  private Configuration routerOverrides;
+  /** Namenode configuration overrides. */
+  private Configuration namenodeOverrides;
+
+
+  /**
+   * Router context.
+   */
+  public class RouterContext {
+    private Router router;
+    private FileContext fileContext;
+    private String nameserviceId;
+    private String namenodeId;
+    private int rpcPort;
+    private int httpPort;
+    private DFSClient client;
+    private Configuration conf;
+    private RouterClient adminClient;
+    private URI fileSystemUri;
+
+    public RouterContext(Configuration conf, String nsId, String nnId) {
+      this.conf = conf;
+      this.nameserviceId = nsId;
+      this.namenodeId = nnId;
+
+      this.router = new Router();
+      this.router.init(conf);
+    }
+
+    public Router getRouter() {
+      return this.router;
+    }
+
+    public String getNameserviceId() {
+      return this.nameserviceId;
+    }
+
+    public String getNamenodeId() {
+      return this.namenodeId;
+    }
+
+    public int getRpcPort() {
+      return this.rpcPort;
+    }
+
+    public int getHttpPort() {
+      return this.httpPort;
+    }
+
+    public FileContext getFileContext() {
+      return this.fileContext;
+    }
+
+    public void initRouter() throws URISyntaxException {
+      // Store the bound points for the router interfaces
+      InetSocketAddress rpcAddress = router.getRpcServerAddress();
+      if (rpcAddress != null) {
+        this.rpcPort = rpcAddress.getPort();
+        this.fileSystemUri =
+            URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
+        // Override the default FS to point to the router RPC
+        DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+        try {
+          this.fileContext = FileContext.getFileContext(conf);
+        } catch (UnsupportedFileSystemException e) {
+          this.fileContext = null;
+        }
+      }
+      InetSocketAddress httpAddress = router.getHttpServerAddress();
+      if (httpAddress != null) {
+        this.httpPort = httpAddress.getPort();
+      }
+    }
+
+    public FileSystem getFileSystem() throws IOException {
+      return DistributedFileSystem.get(conf);
+    }
+
+    public DFSClient getClient(UserGroupInformation user)
+        throws IOException, URISyntaxException, InterruptedException {
+
+      LOG.info("Connecting to router at {}", fileSystemUri);
+      return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
+        @Override
+        public DFSClient run() throws IOException {
+          return new DFSClient(fileSystemUri, conf);
+        }
+      });
+    }
+
+    public RouterClient getAdminClient() throws IOException {
+      if (adminClient == null) {
+        InetSocketAddress routerSocket = router.getAdminServerAddress();
+        LOG.info("Connecting to router admin at {}", routerSocket);
+        adminClient = new RouterClient(routerSocket, conf);
+      }
+      return adminClient;
+    }
+
+    public DFSClient getClient() throws IOException, URISyntaxException {
+      if (client == null) {
+        LOG.info("Connecting to router at {}", fileSystemUri);
+        client = new DFSClient(fileSystemUri, conf);
+      }
+      return client;
+    }
+  }
+
+  /**
+   * Namenode context in the federated cluster.
+   */
+  public class NamenodeContext {
+    private Configuration conf;
+    private NameNode namenode;
+    private String nameserviceId;
+    private String namenodeId;
+    private FileContext fileContext;
+    private int rpcPort;
+    private int servicePort;
+    private int lifelinePort;
+    private int httpPort;
+    private URI fileSystemUri;
+    private int index;
+    private DFSClient client;
+
+    public NamenodeContext(
+        Configuration conf, String nsId, String nnId, int index) {
+      this.conf = conf;
+      this.nameserviceId = nsId;
+      this.namenodeId = nnId;
+      this.index = index;
+    }
+
+    public NameNode getNamenode() {
+      return this.namenode;
+    }
+
+    public String getNameserviceId() {
+      return this.nameserviceId;
+    }
+
+    public String getNamenodeId() {
+      return this.namenodeId;
+    }
+
+    public FileContext getFileContext() {
+      return this.fileContext;
+    }
+
+    public void setNamenode(NameNode nn) throws URISyntaxException {
+      this.namenode = nn;
+
+      // Store the bound ports and override the default FS with the local NN 
RPC
+      this.rpcPort = nn.getNameNodeAddress().getPort();
+      this.servicePort = nn.getServiceRpcAddress().getPort();
+      this.lifelinePort = nn.getServiceRpcAddress().getPort();
+      this.httpPort = nn.getHttpAddress().getPort();
+      this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
+      DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
+
+      try {
+        this.fileContext = FileContext.getFileContext(this.conf);
+      } catch (UnsupportedFileSystemException e) {
+        this.fileContext = null;
+      }
+    }
+
+    public String getRpcAddress() {
+      return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort;
+    }
+
+    public String getServiceAddress() {
+      return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort;
+    }
+
+    public String getLifelineAddress() {
+      return namenode.getServiceRpcAddress().getHostName() + ":" + 
lifelinePort;
+    }
+
+    public String getHttpAddress() {
+      return namenode.getHttpAddress().getHostName() + ":" + httpPort;
+    }
+
+    public FileSystem getFileSystem() throws IOException {
+      return DistributedFileSystem.get(conf);
+    }
+
+    public void resetClient() {
+      client = null;
+    }
+
+    public DFSClient getClient(UserGroupInformation user)
+        throws IOException, URISyntaxException, InterruptedException {
+
+      LOG.info("Connecting to namenode at {}", fileSystemUri);
+      return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
+        @Override
+        public DFSClient run() throws IOException {
+          return new DFSClient(fileSystemUri, conf);
+        }
+      });
+    }
+
+    public DFSClient getClient() throws IOException, URISyntaxException {
+      if (client == null) {
+        LOG.info("Connecting to namenode at {}", fileSystemUri);
+        client = new DFSClient(fileSystemUri, conf);
+      }
+      return client;
+    }
+
+    public String getConfSuffix() {
+      String suffix = nameserviceId;
+      if (highAvailability) {
+        suffix += "." + namenodeId;
+      }
+      return suffix;
+    }
+  }
+
+  public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes,
+      long heartbeatInterval, long cacheFlushInterval) {
+    this.highAvailability = ha;
+    this.heartbeatInterval = heartbeatInterval;
+    this.cacheFlushInterval = cacheFlushInterval;
+    configureNameservices(numNameservices, numNamenodes);
+  }
+
+  public RouterDFSCluster(boolean ha, int numNameservices) {
+    this(ha, numNameservices, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+  }
+
+  public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
+    this(ha, numNameservices, numNamenodes,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+  }
+
+  /**
+   * Add configuration settings to override default Router settings.
+   *
+   * @param conf Router configuration overrides.
+   */
+  public void addRouterOverrides(Configuration conf) {
+    if (this.routerOverrides == null) {
+      this.routerOverrides = conf;
+    } else {
+      this.routerOverrides.addResource(conf);
+    }
+  }
+
+  /**
+   * Add configuration settings to override default Namenode settings.
+   *
+   * @param conf Namenode configuration overrides.
+   */
+  public void addNamenodeOverrides(Configuration conf) {
+    if (this.namenodeOverrides == null) {
+      this.namenodeOverrides = conf;
+    } else {
+      this.namenodeOverrides.addResource(conf);
+    }
+  }
+
+  /**
+   * Generate the configuration for a client.
+   *
+   * @param nsId Nameservice identifier.
+   * @return New namenode configuration.
+   */
+  public Configuration generateNamenodeConfiguration(String nsId) {
+    Configuration conf = new HdfsConfiguration();
+
+    conf.set(DFS_NAMESERVICES, getNameservicesKey());
+    conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
+
+    for (String ns : nameservices) {
+      if (highAvailability) {
+        conf.set(
+            DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+            NAMENODES[0] + "," + NAMENODES[1]);
+      }
+
+      for (NamenodeContext context : getNamenodes(ns)) {
+        String suffix = context.getConfSuffix();
+
+        conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
+            "127.0.0.1:" + context.rpcPort);
+        conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
+            "127.0.0.1:" + context.httpPort);
+        conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
+            "0.0.0.0");
+
+        // If the service port is enabled by default, we need to set them up
+        boolean servicePortEnabled = false;
+        if (servicePortEnabled) {
+          conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix,
+              "127.0.0.1:" + context.servicePort);
+          conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix,
+              "0.0.0.0");
+        }
+      }
+    }
+
+    if (this.namenodeOverrides != null) {
+      conf.addResource(this.namenodeOverrides);
+    }
+    return conf;
+  }
+
+  /**
+   * Generate the configuration for a client.
+   *
+   * @return New configuration for a client.
+   */
+  public Configuration generateClientConfiguration() {
+    Configuration conf = new HdfsConfiguration(false);
+    String ns0 = getNameservices().get(0);
+    conf.addResource(generateNamenodeConfiguration(ns0));
+    return conf;
+  }
+
+  /**
+   * Generate the configuration for a Router.
+   *
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   * @return New configuration for a Router.
+   */
+  public Configuration generateRouterConfiguration(String nsId, String nnId) {
+
+    Configuration conf = new HdfsConfiguration(false);
+    conf.addResource(generateNamenodeConfiguration(nsId));
+
+    conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10);
+    conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+
+    conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0");
+
+    conf.set(DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0");
+
+    conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
+    conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
+    conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval);
+
+    // Use mock resolver classes
+    conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, ActiveNamenodeResolver.class);
+    conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, FileSubclusterResolver.class);
+
+    // Set the nameservice ID for the default NN monitor
+    conf.set(DFS_NAMESERVICE_ID, nsId);
+    if (nnId != null) {
+      conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
+    }
+
+    // Namenodes to monitor
+    StringBuilder sb = new StringBuilder();
+    for (String ns : this.nameservices) {
+      for (NamenodeContext context : getNamenodes(ns)) {
+        String suffix = context.getConfSuffix();
+        if (sb.length() != 0) {
+          sb.append(",");
+        }
+        sb.append(suffix);
+      }
+    }
+    conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString());
+
+    // Add custom overrides if available
+    if (this.routerOverrides != null) {
+      for (Entry<String, String> entry : this.routerOverrides) {
+        String confKey = entry.getKey();
+        String confValue = entry.getValue();
+        conf.set(confKey, confValue);
+      }
+    }
+    return conf;
+  }
+
+  public void configureNameservices(int numNameservices, int numNamenodes) {
+    this.nameservices = new ArrayList<>();
+    this.namenodes = new ArrayList<>();
+
+    NamenodeContext context = null;
+    int nnIndex = 0;
+    for (int i=0; i<numNameservices; i++) {
+      String ns = "ns" + i;
+      this.nameservices.add("ns" + i);
+
+      Configuration nnConf = generateNamenodeConfiguration(ns);
+      if (!highAvailability) {
+        context = new NamenodeContext(nnConf, ns, null, nnIndex++);
+        this.namenodes.add(context);
+      } else {
+        for (int j=0; j<numNamenodes; j++) {
+          context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
+          this.namenodes.add(context);
+        }
+      }
+    }
+  }
+
+  public void setNumDatanodesPerNameservice(int num) {
+    this.numDatanodesPerNameservice = num;
+  }
+
+  public String getNameservicesKey() {
+    StringBuilder sb = new StringBuilder();
+    for (String nsId : this.nameservices) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(nsId);
+    }
+    return sb.toString();
+  }
+
+  public String getRandomNameservice() {
+    Random r = new Random();
+    int randIndex = r.nextInt(nameservices.size());
+    return nameservices.get(randIndex);
+  }
+
+  public List<String> getNameservices() {
+    return nameservices;
+  }
+
+  public List<NamenodeContext> getNamenodes(String nameservice) {
+    List<NamenodeContext> nns = new ArrayList<>();
+    for (NamenodeContext c : namenodes) {
+      if (c.nameserviceId.equals(nameservice)) {
+        nns.add(c);
+      }
+    }
+    return nns;
+  }
+
+  public NamenodeContext getRandomNamenode() {
+    Random rand = new Random();
+    int i = rand.nextInt(this.namenodes.size());
+    return this.namenodes.get(i);
+  }
+
+  public List<NamenodeContext> getNamenodes() {
+    return this.namenodes;
+  }
+
+  public boolean isHighAvailability() {
+    return highAvailability;
+  }
+
+  public NamenodeContext getNamenode(String nameservice, String namenode) {
+    for (NamenodeContext c : this.namenodes) {
+      if (c.nameserviceId.equals(nameservice)) {
+        if (namenode == null || namenode.isEmpty() ||
+            c.namenodeId == null || c.namenodeId.isEmpty()) {
+          return c;
+        } else if (c.namenodeId.equals(namenode)) {
+          return c;
+        }
+      }
+    }
+    return null;
+  }
+
+  public List<RouterContext> getRouters(String nameservice) {
+    List<RouterContext> nns = new ArrayList<>();
+    for (RouterContext c : routers) {
+      if (c.nameserviceId.equals(nameservice)) {
+        nns.add(c);
+      }
+    }
+    return nns;
+  }
+
+  public RouterContext getRouterContext(String nsId, String nnId) {
+    for (RouterContext c : routers) {
+      if (nnId == null) {
+        return c;
+      }
+      if (c.namenodeId.equals(nnId) &&
+          c.nameserviceId.equals(nsId)) {
+        return c;
+      }
+    }
+    return null;
+  }
+
+  public RouterContext getRandomRouter() {
+    Random rand = new Random();
+    return routers.get(rand.nextInt(routers.size()));
+  }
+
+  public List<RouterContext> getRouters() {
+    return routers;
+  }
+
+  public RouterContext buildRouter(String nsId, String nnId)
+      throws URISyntaxException, IOException {
+    Configuration config = generateRouterConfiguration(nsId, nnId);
+    RouterContext rc = new RouterContext(config, nsId, nnId);
+    return rc;
+  }
+
+  public void startCluster() {
+    startCluster(null);
+  }
+
+  public void startCluster(Configuration overrideConf) {
+    try {
+      MiniDFSNNTopology topology = new MiniDFSNNTopology();
+      for (String ns : nameservices) {
+        NSConf conf = new MiniDFSNNTopology.NSConf(ns);
+        if (highAvailability) {
+          for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
+            NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
+            conf.addNN(nnConf);
+          }
+        } else {
+          NNConf nnConf = new MiniDFSNNTopology.NNConf(null);
+          conf.addNN(nnConf);
+        }
+        topology.addNameservice(conf);
+      }
+      topology.setFederation(true);
+
+      // Start mini DFS cluster
+      String ns0 = nameservices.get(0);
+      Configuration nnConf = generateNamenodeConfiguration(ns0);
+      if (overrideConf != null) {
+        nnConf.addResource(overrideConf);
+      }
+      cluster = new MiniDFSCluster.Builder(nnConf)
+          .numDataNodes(nameservices.size() * numDatanodesPerNameservice)
+          .nnTopology(topology)
+          .build();
+      cluster.waitActive();
+
+      // Store NN pointers
+      for (int i = 0; i < namenodes.size(); i++) {
+        NameNode nn = cluster.getNameNode(i);
+        namenodes.get(i).setNamenode(nn);
+      }
+
+    } catch (Exception e) {
+      LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  public void startRouters()
+      throws InterruptedException, URISyntaxException, IOException {
+
+    // Create one router per nameservice
+    this.routers = new ArrayList<>();
+    for (String ns : this.nameservices) {
+      for (NamenodeContext context : getNamenodes(ns)) {
+        RouterContext router = buildRouter(ns, context.namenodeId);
+        this.routers.add(router);
+      }
+    }
+
+    // Start all routers
+    for (RouterContext router : this.routers) {
+      router.router.start();
+    }
+
+    // Wait until all routers are active and record their ports
+    for (RouterContext router : this.routers) {
+      waitActive(router);
+      router.initRouter();
+    }
+  }
+
+  public void waitActive(NamenodeContext nn) throws IOException {
+    cluster.waitActive(nn.index);
+  }
+
+  public void waitActive(RouterContext router)
+      throws InterruptedException {
+    for (int loopCount = 0; loopCount < 20; loopCount++) {
+      // Validate connection of routers to NNs
+      if (router.router.getServiceState() == STATE.STARTED) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    fail("Timeout waiting for " + router.router + " to activate");
+  }
+
+  public void registerNamenodes() throws IOException {
+    for (RouterContext r : this.routers) {
+      ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
+      for (NamenodeContext nn : this.namenodes) {
+        // Generate a report
+        NamenodeStatusReport report = new NamenodeStatusReport(
+            nn.nameserviceId, nn.namenodeId,
+            nn.getRpcAddress(), nn.getServiceAddress(),
+            nn.getLifelineAddress(), nn.getHttpAddress());
+        FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
+        NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
+        report.setNamespaceInfo(nsInfo);
+
+        // Determine HA state from nn public state string
+        String nnState = nn.namenode.getState();
+        HAServiceState haState = HAServiceState.ACTIVE;
+        for (HAServiceState state : HAServiceState.values()) {
+          if (nnState.equalsIgnoreCase(state.name())) {
+            haState = state;
+            break;
+          }
+        }
+        report.setHAServiceState(haState);
+
+        // Register with the resolver
+        resolver.registerNamenode(report);
+      }
+    }
+  }
+
+  public void waitNamenodeRegistration() throws Exception {
+    for (RouterContext r : this.routers) {
+      Router router = r.router;
+      for (NamenodeContext nn : this.namenodes) {
+        ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
+        waitNamenodeRegistered(
+            nnResolver, nn.nameserviceId, nn.namenodeId, null);
+      }
+    }
+  }
+
+  public void waitRouterRegistrationQuorum(RouterContext router,
+      FederationNamenodeServiceState state, String nsId, String nnId)
+          throws Exception {
+    LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
+    ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
+    waitNamenodeRegistered(nnResolver, nsId, nnId, state);
+  }
+
+  /**
+   * Get the federated path for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Path in the Router.
+   */
+  public String getFederatedPathForNS(String nsId) {
+    return "/" + nsId;
+  }
+
+  /**
+   * Get the namenode path for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Path in the Namenode.
+   */
+  public String getNamenodePathForNS(String nsId) {
+    return "/target-" + nsId;
+  }
+
+  /**
+   * Get the federated test directory for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Example:
+   *         <ul>
+   *         <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
+   *         </ul>
+   */
+  public String getFederatedTestDirectoryForNS(String nsId) {
+    return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
+  }
+
+  /**
+   * Get the namenode test directory for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return example:
+   *         <ul>
+   *         <li>/target-ns0/testdir
+   *         </ul>
+   */
+  public String getNamenodeTestDirectoryForNS(String nsId) {
+    return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
+  }
+
+  /**
+   * Get the federated test file for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return example:
+   *         <ul>
+   *         <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
+   *         </ul>
+   */
+  public String getFederatedTestFileForNS(String nsId) {
+    return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
+  }
+
+  /**
+   * Get the namenode test file for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return example:
+   *         <ul>
+   *         <li>/target-ns0/testfile
+   *         </ul>
+   */
+  public String getNamenodeTestFileForNS(String nsId) {
+    return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
+  }
+
+  /**
+   * Switch a namenode in a nameservice to be the active.
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   */
+  public void switchToActive(String nsId, String nnId) {
+    try {
+      int total = cluster.getNumNameNodes();
+      NameNodeInfo[] nns = cluster.getNameNodeInfos();
+      for (int i = 0; i < total; i++) {
+        NameNodeInfo nn = nns[i];
+        if (nn.getNameserviceId().equals(nsId) &&
+            nn.getNamenodeId().equals(nnId)) {
+          cluster.transitionToActive(i);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Cannot transition to active", e);
+    }
+  }
+
+  /**
+   * Switch a namenode in a nameservice to be in standby.
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   */
+  public void switchToStandby(String nsId, String nnId) {
+    try {
+      int total = cluster.getNumNameNodes();
+      NameNodeInfo[] nns = cluster.getNameNodeInfos();
+      for (int i = 0; i < total; i++) {
+        NameNodeInfo nn = nns[i];
+        if (nn.getNameserviceId().equals(nsId) &&
+            nn.getNamenodeId().equals(nnId)) {
+          cluster.transitionToStandby(i);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Cannot transition to standby", e);
+    }
+  }
+
+  /**
+   * Stop the federated HDFS cluster.
+   */
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    if (routers != null) {
+      for (RouterContext context : routers) {
+        stopRouter(context);
+      }
+    }
+  }
+
+  /**
+   * Stop a router.
+   * @param router Router context.
+   */
+  public void stopRouter(RouterContext router) {
+    try {
+      router.router.shutDown();
+
+      int loopCount = 0;
+      while (router.router.getServiceState() != STATE.STOPPED) {
+        loopCount++;
+        Thread.sleep(1000);
+        if (loopCount > 20) {
+          LOG.error("Cannot shutdown router {}", router.rpcPort);
+          break;
+        }
+      }
+    } catch (InterruptedException e) {
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // Namespace Test Fixtures
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Creates test directories via the namenode.
+   * 1) /target-ns0/testfile
+   * 2) /target-ns1/testfile
+   * @throws IOException
+   */
+  public void createTestDirectoriesNamenode() throws IOException {
+    // Add a test dir to each NS and verify
+    for (String ns : getNameservices()) {
+      NamenodeContext context = getNamenode(ns, null);
+      if (!createTestDirectoriesNamenode(context)) {
+        throw new IOException("Cannot create test directory for ns " + ns);
+      }
+    }
+  }
+
+  public boolean createTestDirectoriesNamenode(NamenodeContext nn)
+      throws IOException {
+    FileSystem fs = nn.getFileSystem();
+    String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
+    return addDirectory(fs, testDir);
+  }
+
+  public void deleteAllFiles() throws IOException {
+    // Delete all files via the NNs and verify
+    for (NamenodeContext context : getNamenodes()) {
+      FileSystem fs = context.getFileSystem();
+      FileStatus[] status = fs.listStatus(new Path("/"));
+      for (int i = 0; i <status.length; i++) {
+        Path p = status[i].getPath();
+        fs.delete(p, true);
+      }
+      status = fs.listStatus(new Path("/"));
+      assertEquals(status.length, 0);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // MockRouterResolver Test Fixtures
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * <ul>
+   * <li>/ -> [ns0->/].
+   * <li>/nso -> ns0->/target-ns0.
+   * <li>/ns1 -> ns1->/target-ns1.
+   * </ul>
+   */
+  public void installMockLocations() {
+    for (RouterContext r : routers) {
+      MockResolver resolver =
+          (MockResolver) r.router.getSubclusterResolver();
+      // create table entries
+      for (String nsId : nameservices) {
+        // Direct path
+        String routerPath = getFederatedPathForNS(nsId);
+        String nnPath = getNamenodePathForNS(nsId);
+        resolver.addLocation(routerPath, nsId, nnPath);
+      }
+
+      // Root path points to both first nameservice
+      String ns0 = nameservices.get(0);
+      resolver.addLocation("/", ns0, "/");
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  /**
+   * Wait until the federated cluster is up and ready.
+   * @throws IOException If we cannot wait for the cluster to be up.
+   */
+  public void waitClusterUp() throws IOException {
+    cluster.waitClusterUp();
+    registerNamenodes();
+    try {
+      waitNamenodeRegistration();
+    } catch (Exception e) {
+      throw new IOException("Cannot wait for the namenodes", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
new file mode 100644
index 0000000..aa1906f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation;
+
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+
+/**
+ * Test utility to mimic a federated HDFS cluster with a router and a state
+ * store.
+ */
+public class StateStoreDFSCluster extends RouterDFSCluster {
+
+  private static final Class<?> DEFAULT_FILE_RESOLVER =
+      MountTableResolver.class;
+  private static final Class<?> DEFAULT_NAMENODE_RESOLVER =
+      MembershipNamenodeResolver.class;
+
+  public StateStoreDFSCluster(boolean ha, int numNameservices, int 
numNamenodes,
+      long heartbeatInterval, long cacheFlushInterval)
+          throws IOException, InterruptedException {
+    this(ha, numNameservices, numNamenodes, heartbeatInterval,
+        cacheFlushInterval, DEFAULT_FILE_RESOLVER);
+  }
+
+  public StateStoreDFSCluster(boolean ha, int numNameservices, int 
numNamenodes,
+      long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver)
+          throws IOException, InterruptedException {
+    super(ha, numNameservices, numNamenodes, heartbeatInterval,
+        cacheFlushInterval);
+
+    // Attach state store and resolvers to router
+    Configuration stateStoreConfig = getStateStoreConfiguration();
+    // Use state store backed resolvers
+    stateStoreConfig.setClass(
+        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class);
+    stateStoreConfig.setClass(
+        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        fileResolver, FileSubclusterResolver.class);
+    this.addRouterOverrides(stateStoreConfig);
+  }
+
+  public StateStoreDFSCluster(boolean ha, int numNameservices,
+      Class<?> fileResolver) throws IOException, InterruptedException {
+    this(ha, numNameservices, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, 
fileResolver);
+  }
+
+  public StateStoreDFSCluster(boolean ha, int numNameservices)
+      throws IOException, InterruptedException {
+    this(ha, numNameservices, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+  }
+
+  public StateStoreDFSCluster(boolean ha, int numNameservices,
+      int numNamnodes) throws IOException, InterruptedException {
+    this(ha, numNameservices, numNamnodes,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // State Store Test Fixtures
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Adds test fixtures for NN registation for each NN nameservice -> NS
+   * namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state ->
+   * STANDBY safeMode -> false blockPool -> test.
+   *
+   * @param stateStore State Store.
+   * @throws IOException If it cannot register.
+   */
+  public void createTestRegistration(StateStoreService stateStore)
+      throws IOException {
+    List<MembershipState> entries = new ArrayList<MembershipState>();
+    for (NamenodeContext nn : this.getNamenodes()) {
+      MembershipState entry = createMockRegistrationForNamenode(
+          nn.getNameserviceId(), nn.getNamenodeId(),
+          FederationNamenodeServiceState.STANDBY);
+      entries.add(entry);
+    }
+    synchronizeRecords(
+        stateStore, entries, MembershipState.class);
+  }
+
+  public void createTestMountTable(StateStoreService stateStore)
+      throws IOException {
+    List<MountTable> mounts = generateMockMountTable();
+    synchronizeRecords(stateStore, mounts, MountTable.class);
+    stateStore.refreshCaches();
+  }
+
+  public List<MountTable> generateMockMountTable() throws IOException {
+    // create table entries
+    List<MountTable> entries = new ArrayList<>();
+    for (String ns : this.getNameservices()) {
+      Map<String, String> destMap = new HashMap<>();
+      destMap.put(ns, getNamenodePathForNS(ns));
+
+      // Direct path
+      String fedPath = getFederatedPathForNS(ns);
+      MountTable entry = MountTable.newInstance(fedPath, destMap);
+      entries.add(entry);
+    }
+
+    // Root path goes to nameservice 1
+    Map<String, String> destMap = new HashMap<>();
+    String ns0 = this.getNameservices().get(0);
+    destMap.put(ns0, "/");
+    MountTable entry = MountTable.newInstance("/", destMap);
+    entries.add(entry);
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
new file mode 100644
index 0000000..94799f3
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.MalformedObjectNameException;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import 
org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Test;
+
+/**
+ * Test the JMX interface for the {@link Router}.
+ */
+public class TestFederationMetrics extends TestMetricsBase {
+
+  public static final String FEDERATION_BEAN =
+      "Hadoop:service=Router,name=FederationState";
+  public static final String STATE_STORE_BEAN =
+      "Hadoop:service=Router,name=StateStore";
+  public static final String RPC_BEAN =
+      "Hadoop:service=Router,name=FederationRPC";
+
+  @Test
+  public void testClusterStatsJMX()
+      throws MalformedObjectNameException, IOException {
+
+    FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class);
+    validateClusterStatsBean(bean);
+  }
+
+  @Test
+  public void testClusterStatsDataSource() throws IOException {
+    FederationMetrics metrics = getRouter().getMetrics();
+    validateClusterStatsBean(metrics);
+  }
+
+  @Test
+  public void testMountTableStatsDataSource()
+      throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getMountTable();
+    JSONArray jsonArray = new JSONArray(jsonString);
+    assertEquals(jsonArray.length(), getMockMountTable().size());
+
+    int match = 0;
+    for (int i = 0; i < jsonArray.length(); i++) {
+      JSONObject json = jsonArray.getJSONObject(i);
+      String src = json.getString("sourcePath");
+
+      for (MountTable entry : getMockMountTable()) {
+        if (entry.getSourcePath().equals(src)) {
+          assertEquals(entry.getDefaultLocation().getNameserviceId(),
+              json.getString("nameserviceId"));
+          assertEquals(entry.getDefaultLocation().getDest(),
+              json.getString("path"));
+          assertEquals(entry.getOwnerName(), json.getString("ownerName"));
+          assertEquals(entry.getGroupName(), json.getString("groupName"));
+          assertEquals(entry.getMode().toString(), json.getString("mode"));
+          assertEquals(entry.getQuota().toString(), json.getString("quota"));
+          assertNotNullAndNotEmpty(json.getString("dateCreated"));
+          assertNotNullAndNotEmpty(json.getString("dateModified"));
+          match++;
+        }
+      }
+    }
+    assertEquals(match, getMockMountTable().size());
+  }
+
+  private MembershipState findMockNamenode(String nsId, String nnId) {
+
+    @SuppressWarnings("unchecked")
+    List<MembershipState> namenodes =
+        ListUtils.union(getActiveMemberships(), getStandbyMemberships());
+    for (MembershipState nn : namenodes) {
+      if (nn.getNamenodeId().equals(nnId)
+          && nn.getNameserviceId().equals(nsId)) {
+        return nn;
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testNamenodeStatsDataSource() throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getNamenodes();
+    JSONObject jsonObject = new JSONObject(jsonString);
+    Iterator<?> keys = jsonObject.keys();
+    int nnsFound = 0;
+    while (keys.hasNext()) {
+      // Validate each entry against our mocks
+      JSONObject json = jsonObject.getJSONObject((String) keys.next());
+      String nameserviceId = json.getString("nameserviceId");
+      String namenodeId = json.getString("namenodeId");
+
+      MembershipState mockEntry =
+          this.findMockNamenode(nameserviceId, namenodeId);
+      assertNotNull(mockEntry);
+
+      assertEquals(json.getString("state"), mockEntry.getState().toString());
+      MembershipStats stats = mockEntry.getStats();
+      assertEquals(json.getLong("numOfActiveDatanodes"),
+          stats.getNumOfActiveDatanodes());
+      assertEquals(json.getLong("numOfDeadDatanodes"),
+          stats.getNumOfDeadDatanodes());
+      assertEquals(json.getLong("numOfDecommissioningDatanodes"),
+          stats.getNumOfDecommissioningDatanodes());
+      assertEquals(json.getLong("numOfDecomActiveDatanodes"),
+          stats.getNumOfDecomActiveDatanodes());
+      assertEquals(json.getLong("numOfDecomDeadDatanodes"),
+          stats.getNumOfDecomDeadDatanodes());
+      assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
+      assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
+      assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
+      nnsFound++;
+    }
+    // Validate all memberships are present
+    assertEquals(getActiveMemberships().size() + 
getStandbyMemberships().size(),
+        nnsFound);
+  }
+
+  @Test
+  public void testNameserviceStatsDataSource()
+      throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getNameservices();
+    JSONObject jsonObject = new JSONObject(jsonString);
+    Iterator<?> keys = jsonObject.keys();
+    int nameservicesFound = 0;
+    while (keys.hasNext()) {
+      JSONObject json = jsonObject.getJSONObject((String) keys.next());
+      String nameserviceId = json.getString("nameserviceId");
+      String namenodeId = json.getString("namenodeId");
+
+      MembershipState mockEntry =
+          this.findMockNamenode(nameserviceId, namenodeId);
+      assertNotNull(mockEntry);
+
+      // NS should report the active NN
+      assertEquals(mockEntry.getState().toString(), json.getString("state"));
+      assertEquals("ACTIVE", json.getString("state"));
+
+      // Stats in the NS should reflect the stats for the most active NN
+      MembershipStats stats = mockEntry.getStats();
+      assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles"));
+      assertEquals(stats.getTotalSpace(), json.getLong("totalSpace"));
+      assertEquals(stats.getAvailableSpace(),
+          json.getLong("availableSpace"));
+      assertEquals(stats.getNumOfBlocksMissing(),
+          json.getLong("numOfBlocksMissing"));
+      assertEquals(stats.getNumOfActiveDatanodes(),
+          json.getLong("numOfActiveDatanodes"));
+      assertEquals(stats.getNumOfDeadDatanodes(),
+          json.getLong("numOfDeadDatanodes"));
+      assertEquals(stats.getNumOfDecommissioningDatanodes(),
+          json.getLong("numOfDecommissioningDatanodes"));
+      assertEquals(stats.getNumOfDecomActiveDatanodes(),
+          json.getLong("numOfDecomActiveDatanodes"));
+      assertEquals(stats.getNumOfDecomDeadDatanodes(),
+          json.getLong("numOfDecomDeadDatanodes"));
+      assertEquals(stats.getProvidedSpace(),
+          json.getLong("providedSpace"));
+      nameservicesFound++;
+    }
+    assertEquals(getNameservices().size(), nameservicesFound);
+  }
+
+  @Test
+  public void testRouterStatsDataSource() throws IOException, JSONException {
+
+    FederationMetrics metrics = getRouter().getMetrics();
+    String jsonString = metrics.getRouters();
+    JSONObject jsonObject = new JSONObject(jsonString);
+    Iterator<?> keys = jsonObject.keys();
+    int routersFound = 0;
+    while (keys.hasNext()) {
+      JSONObject json = jsonObject.getJSONObject((String) keys.next());
+      String address = json.getString("address");
+      assertNotNullAndNotEmpty(address);
+      RouterState router = findMockRouter(address);
+      assertNotNull(router);
+
+      assertEquals(router.getStatus().toString(), json.getString("status"));
+      assertEquals(router.getCompileInfo(), json.getString("compileInfo"));
+      assertEquals(router.getVersion(), json.getString("version"));
+      assertEquals(router.getDateStarted(), json.getLong("dateStarted"));
+      assertEquals(router.getDateCreated(), json.getLong("dateCreated"));
+      assertEquals(router.getDateModified(), json.getLong("dateModified"));
+
+      StateStoreVersion version = router.getStateStoreVersion();
+      assertEquals(
+          FederationMetrics.getDateString(version.getMembershipVersion()),
+          json.get("lastMembershipUpdate"));
+      assertEquals(
+          FederationMetrics.getDateString(version.getMountTableVersion()),
+          json.get("lastMountTableUpdate"));
+      assertEquals(version.getMembershipVersion(),
+          json.get("membershipVersion"));
+      assertEquals(version.getMountTableVersion(),
+          json.get("mountTableVersion"));
+      routersFound++;
+    }
+
+    assertEquals(getMockRouters().size(), routersFound);
+  }
+
+  private void assertNotNullAndNotEmpty(String field) {
+    assertNotNull(field);
+    assertTrue(field.length() > 0);
+  }
+
+  private RouterState findMockRouter(String routerId) {
+    for (RouterState router : getMockRouters()) {
+      if (router.getAddress().equals(routerId)) {
+        return router;
+      }
+    }
+    return null;
+  }
+
+  private void validateClusterStatsBean(FederationMBean bean)
+      throws IOException {
+
+    // Determine aggregates
+    long numBlocks = 0;
+    long numLive = 0;
+    long numDead = 0;
+    long numDecom = 0;
+    long numDecomLive = 0;
+    long numDecomDead = 0;
+    long numFiles = 0;
+    for (MembershipState mock : getActiveMemberships()) {
+      MembershipStats stats = mock.getStats();
+      numBlocks += stats.getNumOfBlocks();
+      numLive += stats.getNumOfActiveDatanodes();
+      numDead += stats.getNumOfDeadDatanodes();
+      numDecom += stats.getNumOfDecommissioningDatanodes();
+      numDecomLive += stats.getNumOfDecomActiveDatanodes();
+      numDecomDead += stats.getNumOfDecomDeadDatanodes();
+    }
+
+    assertEquals(numBlocks, bean.getNumBlocks());
+    assertEquals(numLive, bean.getNumLiveNodes());
+    assertEquals(numDead, bean.getNumDeadNodes());
+    assertEquals(numDecom, bean.getNumDecommissioningNodes());
+    assertEquals(numDecomLive, bean.getNumDecomLiveNodes());
+    assertEquals(numDecomDead, bean.getNumDecomDeadNodes());
+    assertEquals(numFiles, bean.getNumFiles());
+    assertEquals(getActiveMemberships().size() + 
getStandbyMemberships().size(),
+        bean.getNumNamenodes());
+    assertEquals(getNameservices().size(), bean.getNumNameservices());
+    assertTrue(bean.getVersion().length() > 0);
+    assertTrue(bean.getCompiledDate().length() > 0);
+    assertTrue(bean.getCompileInfo().length() > 0);
+    assertTrue(bean.getRouterStarted().length() > 0);
+    assertTrue(bean.getHostAndPort().length() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
new file mode 100644
index 0000000..2169b21
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.metrics;
+
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
+import static 
org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import 
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import 
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import 
org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test the basic metrics functionality.
+ */
+public class TestMetricsBase {
+
+  private StateStoreService stateStore;
+  private MembershipStore membershipStore;
+  private RouterStore routerStore;
+  private Router router;
+  private Configuration routerConfig;
+
+  private List<MembershipState> activeMemberships;
+  private List<MembershipState> standbyMemberships;
+  private List<MountTable> mockMountTable;
+  private List<RouterState> mockRouters;
+  private List<String> nameservices;
+
+  @Before
+  public void setupBase() throws Exception {
+
+    if (router == null) {
+      routerConfig = new RouterConfigBuilder()
+          .stateStore()
+          .metrics()
+          .http()
+          .build();
+      router = new Router();
+      router.init(routerConfig);
+      router.setRouterId("routerId");
+      router.start();
+      stateStore = router.getStateStore();
+
+      membershipStore =
+          stateStore.getRegisteredRecordStore(MembershipStore.class);
+      routerStore = stateStore.getRegisteredRecordStore(RouterStore.class);
+
+      // Read all data and load all caches
+      waitStateStore(stateStore, 10000);
+      createFixtures();
+      stateStore.refreshCaches(true);
+      Thread.sleep(1000);
+    }
+  }
+
+  @After
+  public void tearDownBase() throws IOException {
+    if (router != null) {
+      router.stop();
+      router.close();
+      router = null;
+    }
+  }
+
+  private void createFixtures() throws IOException {
+    // Clear all records
+    clearAllRecords(stateStore);
+
+    nameservices = new ArrayList<>();
+    nameservices.add(NAMESERVICES[0]);
+    nameservices.add(NAMESERVICES[1]);
+
+    // 2 NNs per NS
+    activeMemberships = new ArrayList<>();
+    standbyMemberships = new ArrayList<>();
+
+    for (String nameservice : nameservices) {
+      MembershipState namenode1 = createMockRegistrationForNamenode(
+          nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE);
+      NamenodeHeartbeatRequest request1 =
+          NamenodeHeartbeatRequest.newInstance(namenode1);
+      assertTrue(membershipStore.namenodeHeartbeat(request1).getResult());
+      activeMemberships.add(namenode1);
+
+      MembershipState namenode2 = createMockRegistrationForNamenode(
+          nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY);
+      NamenodeHeartbeatRequest request2 =
+          NamenodeHeartbeatRequest.newInstance(namenode2);
+      assertTrue(membershipStore.namenodeHeartbeat(request2).getResult());
+      standbyMemberships.add(namenode2);
+    }
+
+    // Add 2 mount table memberships
+    mockMountTable = createMockMountTable(nameservices);
+    synchronizeRecords(stateStore, mockMountTable, MountTable.class);
+
+    // Add 2 router memberships in addition to the running router.
+    long t1 = Time.now();
+    mockRouters = new ArrayList<>();
+    RouterState router1 = RouterState.newInstance(
+        "router1", t1, RouterServiceState.RUNNING);
+    router1.setStateStoreVersion(StateStoreVersion.newInstance(
+        t1 - 1000, t1 - 2000));
+    RouterHeartbeatRequest heartbeatRequest =
+        RouterHeartbeatRequest.newInstance(router1);
+    assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus());
+
+    GetRouterRegistrationRequest getRequest =
+        GetRouterRegistrationRequest.newInstance("router1");
+    GetRouterRegistrationResponse getResponse =
+        routerStore.getRouterRegistration(getRequest);
+    RouterState routerState1 = getResponse.getRouter();
+    mockRouters.add(routerState1);
+
+    long t2 = Time.now();
+    RouterState router2 = RouterState.newInstance(
+        "router2", t2, RouterServiceState.RUNNING);
+    router2.setStateStoreVersion(StateStoreVersion.newInstance(
+        t2 - 6000, t2 - 7000));
+    heartbeatRequest.setRouter(router2);
+    assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus());
+    getRequest.setRouterId("router2");
+    getResponse = routerStore.getRouterRegistration(getRequest);
+    RouterState routerState2 = getResponse.getRouter();
+    mockRouters.add(routerState2);
+  }
+
+  protected Router getRouter() {
+    return router;
+  }
+
+  protected List<MountTable> getMockMountTable() {
+    return mockMountTable;
+  }
+
+  protected List<MembershipState> getActiveMemberships() {
+    return activeMemberships;
+  }
+
+  protected List<MembershipState> getStandbyMemberships() {
+    return standbyMemberships;
+  }
+
+  protected List<String> getNameservices() {
+    return nameservices;
+  }
+
+  protected List<RouterState> getMockRouters() {
+    return mockRouters;
+  }
+
+  protected StateStoreService getStateStore() {
+    return stateStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
new file mode 100644
index 0000000..cb3b472
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the {@link MountTableStore} from the {@link Router}.
+ */
+public class TestMountTableResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMountTableResolver.class);
+
+  private static final int TEST_MAX_CACHE_SIZE = 10;
+
+  private MountTableResolver mountTable;
+
+  private Map<String, String> getMountTableEntry(
+      String subcluster, String path) {
+    Map<String, String> ret = new HashMap<>();
+    ret.put(subcluster, path);
+    return ret;
+  }
+
+  /**
+   * Setup the mount table.
+   * / -> 1:/
+   * __tmp -> 2:/tmp
+   * __user -> 3:/user
+   * ____a -> 2:/user/test
+   * ______demo
+   * ________test
+   * __________a -> 1:/user/test
+   * __________b -> 3:/user/test
+   * ____b
+   * ______file1.txt -> 4:/user/file1.txt
+   * __usr
+   * ____bin -> 2:/bin
+   * __readonly -> 2:/tmp
+   *
+   * @throws IOException If it cannot set the mount table.
+   */
+  private void setupMountTable() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setInt(
+        FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE, TEST_MAX_CACHE_SIZE);
+    conf.setStrings(DFS_ROUTER_DEFAULT_NAMESERVICE, "0");
+    mountTable = new MountTableResolver(conf);
+
+    // Root mount point
+    Map<String, String> map = getMountTableEntry("1", "/");
+    mountTable.addEntry(MountTable.newInstance("/", map));
+
+    // /tmp
+    map = getMountTableEntry("2", "/");
+    mountTable.addEntry(MountTable.newInstance("/tmp", map));
+
+    // /user
+    map = getMountTableEntry("3", "/user");
+    mountTable.addEntry(MountTable.newInstance("/user", map));
+
+    // /usr/bin
+    map = getMountTableEntry("2", "/bin");
+    mountTable.addEntry(MountTable.newInstance("/usr/bin", map));
+
+    // /user/a
+    map = getMountTableEntry("2", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a", map));
+
+    // /user/b/file1.txt
+    map = getMountTableEntry("4", "/user/file1.txt");
+    mountTable.addEntry(MountTable.newInstance("/user/b/file1.txt", map));
+
+    // /user/a/demo/test/a
+    map = getMountTableEntry("1", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/a", map));
+
+    // /user/a/demo/test/b
+    map = getMountTableEntry("3", "/user/test");
+    mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map));
+
+    // /readonly
+    map = getMountTableEntry("2", "/tmp");
+    MountTable readOnlyEntry = MountTable.newInstance("/readonly", map);
+    readOnlyEntry.setReadOnly(true);
+    mountTable.addEntry(readOnlyEntry);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    setupMountTable();
+  }
+
+  @Test
+  public void testDestination() throws IOException {
+
+    // Check files
+    assertEquals("1->/tesfile1.txt",
+        mountTable.getDestinationForPath("/tesfile1.txt").toString());
+
+    assertEquals("3->/user/testfile2.txt",
+        mountTable.getDestinationForPath("/user/testfile2.txt").toString());
+
+    assertEquals("2->/user/test/testfile3.txt",
+        mountTable.getDestinationForPath("/user/a/testfile3.txt").toString());
+
+    assertEquals("3->/user/b/testfile4.txt",
+        mountTable.getDestinationForPath("/user/b/testfile4.txt").toString());
+
+    assertEquals("1->/share/file5.txt",
+        mountTable.getDestinationForPath("/share/file5.txt").toString());
+
+    assertEquals("2->/bin/file7.txt",
+        mountTable.getDestinationForPath("/usr/bin/file7.txt").toString());
+
+    assertEquals("1->/usr/file8.txt",
+        mountTable.getDestinationForPath("/usr/file8.txt").toString());
+
+    assertEquals("2->/user/test/demo/file9.txt",
+        mountTable.getDestinationForPath("/user/a/demo/file9.txt").toString());
+
+    // Check folders
+    assertEquals("3->/user/testfolder",
+        mountTable.getDestinationForPath("/user/testfolder").toString());
+
+    assertEquals("2->/user/test/b",
+        mountTable.getDestinationForPath("/user/a/b").toString());
+
+    assertEquals("3->/user/test/a",
+        mountTable.getDestinationForPath("/user/test/a").toString());
+
+    assertEquals("2->/tmp/tesfile1.txt",
+        mountTable.getDestinationForPath("/readonly/tesfile1.txt").toString());
+
+  }
+
+  private void compareLists(List<String> list1, String[] list2) {
+    assertEquals(list1.size(), list2.length);
+    for (String item : list2) {
+      assertTrue(list1.contains(item));
+    }
+  }
+
+  @Test
+  public void testGetMountPoint() throws IOException {
+    // Check get the mount table entry for a path
+    MountTable mtEntry;
+    mtEntry = mountTable.getMountPoint("/");
+    assertTrue(mtEntry.getSourcePath().equals("/"));
+
+    mtEntry = mountTable.getMountPoint("/user");
+    assertTrue(mtEntry.getSourcePath().equals("/user"));
+
+    mtEntry = mountTable.getMountPoint("/user/a");
+    assertTrue(mtEntry.getSourcePath().equals("/user/a"));
+
+    mtEntry = mountTable.getMountPoint("/user/a/");
+    assertTrue(mtEntry.getSourcePath().equals("/user/a"));
+
+    mtEntry = mountTable.getMountPoint("/user/a/11");
+    assertTrue(mtEntry.getSourcePath().equals("/user/a"));
+
+    mtEntry = mountTable.getMountPoint("/user/a1");
+    assertTrue(mtEntry.getSourcePath().equals("/user"));
+  }
+
+  @Test
+  public void testGetMountPoints() throws IOException {
+
+    // Check getting all mount points (virtual and real) beneath a path
+    List<String> mounts = mountTable.getMountPoints("/");
+    assertEquals(4, mounts.size());
+    compareLists(mounts, new String[] {"tmp", "user", "usr", "readonly"});
+
+    mounts = mountTable.getMountPoints("/user");
+    assertEquals(2, mounts.size());
+    compareLists(mounts, new String[] {"a", "b"});
+
+    mounts = mountTable.getMountPoints("/user/a");
+    assertEquals(1, mounts.size());
+    compareLists(mounts, new String[] {"demo"});
+
+    mounts = mountTable.getMountPoints("/user/a/demo");
+    assertEquals(1, mounts.size());
+    compareLists(mounts, new String[] {"test"});
+
+    mounts = mountTable.getMountPoints("/user/a/demo/test");
+    assertEquals(2, mounts.size());
+    compareLists(mounts, new String[] {"a", "b"});
+
+    mounts = mountTable.getMountPoints("/tmp");
+    assertEquals(0, mounts.size());
+
+    mounts = mountTable.getMountPoints("/t");
+    assertNull(mounts);
+
+    mounts = mountTable.getMountPoints("/unknownpath");
+    assertNull(mounts);
+  }
+
+  private void compareRecords(List<MountTable> list1, String[] list2) {
+    assertEquals(list1.size(), list2.length);
+    for (String item : list2) {
+      for (MountTable record : list1) {
+        if (record.getSourcePath().equals(item)) {
+          return;
+        }
+      }
+    }
+    fail();
+  }
+
+  @Test
+  public void testGetMounts() throws IOException {
+
+    // Check listing the mount table records at or beneath a path
+    List<MountTable> records = mountTable.getMounts("/");
+    assertEquals(9, records.size());
+    compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin",
+        "user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt",
+        "readonly"});
+
+    records = mountTable.getMounts("/user");
+    assertEquals(5, records.size());
+    compareRecords(records, new String[] {"/user", "/user/a/demo/a",
+        "/user/a/demo/b", "user/a", "/user/b/file1.txt"});
+
+    records = mountTable.getMounts("/user/a");
+    assertEquals(3, records.size());
+    compareRecords(records,
+        new String[] {"/user/a/demo/a", "/user/a/demo/b", "/user/a"});
+
+    records = mountTable.getMounts("/tmp");
+    assertEquals(1, records.size());
+    compareRecords(records, new String[] {"/tmp"});
+
+    records = mountTable.getMounts("/readonly");
+    assertEquals(1, records.size());
+    compareRecords(records, new String[] {"/readonly"});
+    assertTrue(records.get(0).isReadOnly());
+  }
+
+  @Test
+  public void testRemoveSubTree()
+      throws UnsupportedOperationException, IOException {
+
+    // 3 mount points are present /tmp, /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp", "readonly"});
+
+    // /tmp currently points to namespace 2
+    assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+
+    // Remove tmp
+    mountTable.removeEntry("/tmp");
+
+    // Now 2 mount points are present /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "readonly"});
+
+    // /tmp no longer exists, uses default namespace for mapping /
+    assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+  }
+
+  @Test
+  public void testRemoveVirtualNode()
+      throws UnsupportedOperationException, IOException {
+
+    // 3 mount points are present /tmp, /user, /usr
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp", "readonly"});
+
+    // /usr is virtual, uses namespace 1->/
+    assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt")
+        .getDefaultLocation().getNameserviceId());
+
+    // Attempt to remove /usr
+    mountTable.removeEntry("/usr");
+
+    // Verify the remove failed
+    compareLists(mountTable.getMountPoints("/"),
+        new String[] {"user", "usr", "tmp", "readonly"});
+  }
+
+  @Test
+  public void testRemoveLeafNode()
+      throws UnsupportedOperationException, IOException {
+
+    // /user/a/demo/test/a currently points to namespace 1
+    assertEquals("1", mountTable.getDestinationForPath("/user/a/demo/test/a")
+        .getDefaultLocation().getNameserviceId());
+
+    // Remove /user/a/demo/test/a
+    mountTable.removeEntry("/user/a/demo/test/a");
+
+    // Now /user/a/demo/test/a points to namespace 2 using the entry for 
/user/a
+    assertEquals("2", mountTable.getDestinationForPath("/user/a/demo/test/a")
+        .getDefaultLocation().getNameserviceId());
+
+    // Verify the virtual node at /user/a/demo still exists and was not deleted
+    compareLists(mountTable.getMountPoints("/user/a"), new String[] {"demo"});
+
+    // Verify the sibling node was unaffected and still points to ns 3
+    assertEquals("3", mountTable.getDestinationForPath("/user/a/demo/test/b")
+        .getDefaultLocation().getNameserviceId());
+  }
+
+  @Test
+  public void testRefreshEntries()
+      throws UnsupportedOperationException, IOException {
+
+    // Initial table loaded
+    testDestination();
+    assertEquals(9, mountTable.getMounts("/").size());
+
+    // Replace table with /1 and /2
+    List<MountTable> records = new ArrayList<>();
+    Map<String, String> map1 = getMountTableEntry("1", "/");
+    records.add(MountTable.newInstance("/1", map1));
+    Map<String, String> map2 = getMountTableEntry("2", "/");
+    records.add(MountTable.newInstance("/2", map2));
+    mountTable.refreshEntries(records);
+
+    // Verify addition
+    PathLocation destination1 = mountTable.getDestinationForPath("/1");
+    RemoteLocation defaultLoc1 = destination1.getDefaultLocation();
+    assertEquals("1", defaultLoc1.getNameserviceId());
+
+    PathLocation destination2 = mountTable.getDestinationForPath("/2");
+    RemoteLocation defaultLoc2 = destination2.getDefaultLocation();
+    assertEquals("2", defaultLoc2.getNameserviceId());
+
+    // Verify existing entries were removed
+    assertEquals(2, mountTable.getMounts("/").size());
+    boolean assertionThrown = false;
+    try {
+      testDestination();
+      fail();
+    } catch (AssertionError e) {
+      // The / entry was removed, so it triggers an exception
+      assertionThrown = true;
+    }
+    assertTrue(assertionThrown);
+  }
+
+  @Test
+  public void testMountTableScalability() throws IOException {
+
+    List<MountTable> emptyList = new ArrayList<>();
+    mountTable.refreshEntries(emptyList);
+
+    // Add 100,000 entries in flat list
+    for (int i = 0; i < 100000; i++) {
+      Map<String, String> map = getMountTableEntry("1", "/" + i);
+      MountTable record = MountTable.newInstance("/" + i, map);
+      mountTable.addEntry(record);
+      if (i % 10000 == 0) {
+        LOG.info("Adding flat mount record {}: {}", i, record);
+      }
+    }
+
+    assertEquals(100000, mountTable.getMountPoints("/").size());
+    assertEquals(100000, mountTable.getMounts("/").size());
+
+    // Add 1000 entries in deep list
+    mountTable.refreshEntries(emptyList);
+    String parent = "/";
+    for (int i = 0; i < 1000; i++) {
+      final int index = i;
+      Map<String, String> map = getMountTableEntry("1", "/" + index);
+      if (i > 0) {
+        parent = parent + "/";
+      }
+      parent = parent + i;
+      MountTable record = MountTable.newInstance(parent, map);
+      mountTable.addEntry(record);
+    }
+
+    assertEquals(1, mountTable.getMountPoints("/").size());
+    assertEquals(1000, mountTable.getMounts("/").size());
+
+    // Add 100,000 entries in deep and wide tree
+    mountTable.refreshEntries(emptyList);
+    Random rand = new Random();
+    parent = "/" + Integer.toString(rand.nextInt());
+    int numRootTrees = 1;
+    for (int i = 0; i < 100000; i++) {
+      final int index = i;
+      Map<String, String> map = getMountTableEntry("1", "/" + index);
+      parent = parent + "/" + i;
+      if (parent.length() > 2000) {
+        // Start new tree
+        parent = "/" + Integer.toString(rand.nextInt());
+        numRootTrees++;
+      }
+      MountTable record = MountTable.newInstance(parent, map);
+      mountTable.addEntry(record);
+    }
+
+    assertEquals(numRootTrees, mountTable.getMountPoints("/").size());
+    assertEquals(100000, mountTable.getMounts("/").size());
+  }
+
+  @Test
+  public void testUpdate() throws IOException {
+
+    // Add entry to update later
+    Map<String, String> map = getMountTableEntry("1", "/");
+    mountTable.addEntry(MountTable.newInstance("/testupdate", map));
+
+    MountTable entry = mountTable.getMountPoint("/testupdate");
+    List<RemoteLocation> dests = entry.getDestinations();
+    assertEquals(1, dests.size());
+    RemoteLocation dest = dests.get(0);
+    assertEquals("1", dest.getNameserviceId());
+
+    // Update entry
+    Collection<MountTable> entries = Collections.singletonList(
+        MountTable.newInstance("/testupdate", getMountTableEntry("2", "/")));
+    mountTable.refreshEntries(entries);
+
+    MountTable entry1 = mountTable.getMountPoint("/testupdate");
+    List<RemoteLocation> dests1 = entry1.getDestinations();
+    assertEquals(1, dests1.size());
+    RemoteLocation dest1 = dests1.get(0);
+    assertEquals("2", dest1.getNameserviceId());
+
+    // Remove the entry to test updates and check
+    mountTable.removeEntry("/testupdate");
+    MountTable entry2 = mountTable.getMountPoint("/testupdate");
+    assertNull(entry2);
+  }
+
+  @Test
+  public void testCacheCleaning() throws Exception {
+    for (int i = 0; i < 1000; i++) {
+      String filename = String.format("/user/a/file-%04d.txt", i);
+      mountTable.getDestinationForPath(filename);
+    }
+    long cacheSize = mountTable.getCacheSize();
+    assertTrue(cacheSize <= TEST_MAX_CACHE_SIZE);
+  }
+
+  @Test
+  public void testLocationCache() throws Exception {
+    List<MountTable> entries = new ArrayList<>();
+
+    // Add entry and test location cache
+    Map<String, String> map1 = getMountTableEntry("1", "/testlocationcache");
+    MountTable entry1 = MountTable.newInstance("/testlocationcache", map1);
+    entries.add(entry1);
+
+    Map<String, String> map2 = getMountTableEntry("2",
+            "/anothertestlocationcache");
+    MountTable entry2 = MountTable.newInstance("/anothertestlocationcache",
+            map2);
+    entries.add(entry2);
+    mountTable.refreshEntries(entries);
+    assertEquals("1->/testlocationcache",
+            mountTable.getDestinationForPath("/testlocationcache").toString());
+    assertEquals("2->/anothertestlocationcache",
+            mountTable.getDestinationForPath("/anothertestlocationcache")
+                    .toString());
+
+    // Remove the entry1
+    entries.remove(entry1);
+    mountTable.refreshEntries(entries);
+
+    // Add the default location and test location cache
+    assertEquals("0->/testlocationcache",
+            mountTable.getDestinationForPath("/testlocationcache").toString());
+
+    // Add the entry again but mount to another ns
+    Map<String, String> map3 = getMountTableEntry("3", "/testlocationcache");
+    MountTable entry3 = MountTable.newInstance("/testlocationcache", map3);
+    entries.add(entry3);
+    mountTable.refreshEntries(entries);
+
+    // Ensure location cache update correctly
+    assertEquals("3->/testlocationcache",
+            mountTable.getDestinationForPath("/testlocationcache").toString());
+
+    // Cleanup before exit
+    mountTable.removeEntry("/testlocationcache");
+    mountTable.removeEntry("/anothertestlocationcache");
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to