http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java new file mode 100644 index 0000000..9788683 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -0,0 +1,1005 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; +import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test utility to mimic a federated HDFS cluster with multiple routers. + */ +public class RouterDFSCluster { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterDFSCluster.class); + + public static final String TEST_STRING = "teststring"; + public static final String TEST_DIR = "testdir"; + public static final String TEST_FILE = "testfile"; + + + /** Nameservices in the federated cluster. */ + private List<String> nameservices; + /** Namenodes in the federated cluster. */ + private List<NamenodeContext> namenodes; + /** Routers in the federated cluster. */ + private List<RouterContext> routers; + /** If the Namenodes are in high availability.*/ + private boolean highAvailability; + /** Number of datanodes per nameservice. */ + private int numDatanodesPerNameservice = 2; + + /** Mini cluster. */ + private MiniDFSCluster cluster; + + protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = + TimeUnit.SECONDS.toMillis(5); + protected static final long DEFAULT_CACHE_INTERVAL_MS = + TimeUnit.SECONDS.toMillis(5); + /** Heartbeat interval in milliseconds. */ + private long heartbeatInterval; + /** Cache flush interval in milliseconds. */ + private long cacheFlushInterval; + + /** Router configuration overrides. */ + private Configuration routerOverrides; + /** Namenode configuration overrides. */ + private Configuration namenodeOverrides; + + + /** + * Router context. + */ + public class RouterContext { + private Router router; + private FileContext fileContext; + private String nameserviceId; + private String namenodeId; + private int rpcPort; + private int httpPort; + private DFSClient client; + private Configuration conf; + private RouterClient adminClient; + private URI fileSystemUri; + + public RouterContext(Configuration conf, String nsId, String nnId) { + this.conf = conf; + this.nameserviceId = nsId; + this.namenodeId = nnId; + + this.router = new Router(); + this.router.init(conf); + } + + public Router getRouter() { + return this.router; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public int getRpcPort() { + return this.rpcPort; + } + + public int getHttpPort() { + return this.httpPort; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void initRouter() throws URISyntaxException { + // Store the bound points for the router interfaces + InetSocketAddress rpcAddress = router.getRpcServerAddress(); + if (rpcAddress != null) { + this.rpcPort = rpcAddress.getPort(); + this.fileSystemUri = + URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress)); + // Override the default FS to point to the router RPC + DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + try { + this.fileContext = FileContext.getFileContext(conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } + InetSocketAddress httpAddress = router.getHttpServerAddress(); + if (httpAddress != null) { + this.httpPort = httpAddress.getPort(); + } + } + + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to router at {}", fileSystemUri); + return user.doAs(new PrivilegedExceptionAction<DFSClient>() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public RouterClient getAdminClient() throws IOException { + if (adminClient == null) { + InetSocketAddress routerSocket = router.getAdminServerAddress(); + LOG.info("Connecting to router admin at {}", routerSocket); + adminClient = new RouterClient(routerSocket, conf); + } + return adminClient; + } + + public DFSClient getClient() throws IOException, URISyntaxException { + if (client == null) { + LOG.info("Connecting to router at {}", fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + } + + /** + * Namenode context in the federated cluster. + */ + public class NamenodeContext { + private Configuration conf; + private NameNode namenode; + private String nameserviceId; + private String namenodeId; + private FileContext fileContext; + private int rpcPort; + private int servicePort; + private int lifelinePort; + private int httpPort; + private URI fileSystemUri; + private int index; + private DFSClient client; + + public NamenodeContext( + Configuration conf, String nsId, String nnId, int index) { + this.conf = conf; + this.nameserviceId = nsId; + this.namenodeId = nnId; + this.index = index; + } + + public NameNode getNamenode() { + return this.namenode; + } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.namenodeId; + } + + public FileContext getFileContext() { + return this.fileContext; + } + + public void setNamenode(NameNode nn) throws URISyntaxException { + this.namenode = nn; + + // Store the bound ports and override the default FS with the local NN RPC + this.rpcPort = nn.getNameNodeAddress().getPort(); + this.servicePort = nn.getServiceRpcAddress().getPort(); + this.lifelinePort = nn.getServiceRpcAddress().getPort(); + this.httpPort = nn.getHttpAddress().getPort(); + this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); + DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri); + + try { + this.fileContext = FileContext.getFileContext(this.conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } + + public String getRpcAddress() { + return namenode.getNameNodeAddress().getHostName() + ":" + rpcPort; + } + + public String getServiceAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + servicePort; + } + + public String getLifelineAddress() { + return namenode.getServiceRpcAddress().getHostName() + ":" + lifelinePort; + } + + public String getHttpAddress() { + return namenode.getHttpAddress().getHostName() + ":" + httpPort; + } + + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); + } + + public void resetClient() { + client = null; + } + + public DFSClient getClient(UserGroupInformation user) + throws IOException, URISyntaxException, InterruptedException { + + LOG.info("Connecting to namenode at {}", fileSystemUri); + return user.doAs(new PrivilegedExceptionAction<DFSClient>() { + @Override + public DFSClient run() throws IOException { + return new DFSClient(fileSystemUri, conf); + } + }); + } + + public DFSClient getClient() throws IOException, URISyntaxException { + if (client == null) { + LOG.info("Connecting to namenode at {}", fileSystemUri); + client = new DFSClient(fileSystemUri, conf); + } + return client; + } + + public String getConfSuffix() { + String suffix = nameserviceId; + if (highAvailability) { + suffix += "." + namenodeId; + } + return suffix; + } + } + + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval) { + this.highAvailability = ha; + this.heartbeatInterval = heartbeatInterval; + this.cacheFlushInterval = cacheFlushInterval; + configureNameservices(numNameservices, numNamenodes); + } + + public RouterDFSCluster(boolean ha, int numNameservices) { + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { + this(ha, numNameservices, numNamenodes, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + /** + * Add configuration settings to override default Router settings. + * + * @param conf Router configuration overrides. + */ + public void addRouterOverrides(Configuration conf) { + if (this.routerOverrides == null) { + this.routerOverrides = conf; + } else { + this.routerOverrides.addResource(conf); + } + } + + /** + * Add configuration settings to override default Namenode settings. + * + * @param conf Namenode configuration overrides. + */ + public void addNamenodeOverrides(Configuration conf) { + if (this.namenodeOverrides == null) { + this.namenodeOverrides = conf; + } else { + this.namenodeOverrides.addResource(conf); + } + } + + /** + * Generate the configuration for a client. + * + * @param nsId Nameservice identifier. + * @return New namenode configuration. + */ + public Configuration generateNamenodeConfiguration(String nsId) { + Configuration conf = new HdfsConfiguration(); + + conf.set(DFS_NAMESERVICES, getNameservicesKey()); + conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId); + + for (String ns : nameservices) { + if (highAvailability) { + conf.set( + DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + NAMENODES[0] + "," + NAMENODES[1]); + } + + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.rpcPort); + conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.httpPort); + conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, + "0.0.0.0"); + + // If the service port is enabled by default, we need to set them up + boolean servicePortEnabled = false; + if (servicePortEnabled) { + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.servicePort); + conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix, + "0.0.0.0"); + } + } + } + + if (this.namenodeOverrides != null) { + conf.addResource(this.namenodeOverrides); + } + return conf; + } + + /** + * Generate the configuration for a client. + * + * @return New configuration for a client. + */ + public Configuration generateClientConfiguration() { + Configuration conf = new HdfsConfiguration(false); + String ns0 = getNameservices().get(0); + conf.addResource(generateNamenodeConfiguration(ns0)); + return conf; + } + + /** + * Generate the configuration for a Router. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + * @return New configuration for a Router. + */ + public Configuration generateRouterConfiguration(String nsId, String nnId) { + + Configuration conf = new HdfsConfiguration(false); + conf.addResource(generateNamenodeConfiguration(nsId)); + + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10); + conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); + conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval); + + // Use mock resolver classes + conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); + + // Set the nameservice ID for the default NN monitor + conf.set(DFS_NAMESERVICE_ID, nsId); + if (nnId != null) { + conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); + } + + // Namenodes to monitor + StringBuilder sb = new StringBuilder(); + for (String ns : this.nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + } + conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + + // Add custom overrides if available + if (this.routerOverrides != null) { + for (Entry<String, String> entry : this.routerOverrides) { + String confKey = entry.getKey(); + String confValue = entry.getValue(); + conf.set(confKey, confValue); + } + } + return conf; + } + + public void configureNameservices(int numNameservices, int numNamenodes) { + this.nameservices = new ArrayList<>(); + this.namenodes = new ArrayList<>(); + + NamenodeContext context = null; + int nnIndex = 0; + for (int i=0; i<numNameservices; i++) { + String ns = "ns" + i; + this.nameservices.add("ns" + i); + + Configuration nnConf = generateNamenodeConfiguration(ns); + if (!highAvailability) { + context = new NamenodeContext(nnConf, ns, null, nnIndex++); + this.namenodes.add(context); + } else { + for (int j=0; j<numNamenodes; j++) { + context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++); + this.namenodes.add(context); + } + } + } + } + + public void setNumDatanodesPerNameservice(int num) { + this.numDatanodesPerNameservice = num; + } + + public String getNameservicesKey() { + StringBuilder sb = new StringBuilder(); + for (String nsId : this.nameservices) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(nsId); + } + return sb.toString(); + } + + public String getRandomNameservice() { + Random r = new Random(); + int randIndex = r.nextInt(nameservices.size()); + return nameservices.get(randIndex); + } + + public List<String> getNameservices() { + return nameservices; + } + + public List<NamenodeContext> getNamenodes(String nameservice) { + List<NamenodeContext> nns = new ArrayList<>(); + for (NamenodeContext c : namenodes) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public NamenodeContext getRandomNamenode() { + Random rand = new Random(); + int i = rand.nextInt(this.namenodes.size()); + return this.namenodes.get(i); + } + + public List<NamenodeContext> getNamenodes() { + return this.namenodes; + } + + public boolean isHighAvailability() { + return highAvailability; + } + + public NamenodeContext getNamenode(String nameservice, String namenode) { + for (NamenodeContext c : this.namenodes) { + if (c.nameserviceId.equals(nameservice)) { + if (namenode == null || namenode.isEmpty() || + c.namenodeId == null || c.namenodeId.isEmpty()) { + return c; + } else if (c.namenodeId.equals(namenode)) { + return c; + } + } + } + return null; + } + + public List<RouterContext> getRouters(String nameservice) { + List<RouterContext> nns = new ArrayList<>(); + for (RouterContext c : routers) { + if (c.nameserviceId.equals(nameservice)) { + nns.add(c); + } + } + return nns; + } + + public RouterContext getRouterContext(String nsId, String nnId) { + for (RouterContext c : routers) { + if (nnId == null) { + return c; + } + if (c.namenodeId.equals(nnId) && + c.nameserviceId.equals(nsId)) { + return c; + } + } + return null; + } + + public RouterContext getRandomRouter() { + Random rand = new Random(); + return routers.get(rand.nextInt(routers.size())); + } + + public List<RouterContext> getRouters() { + return routers; + } + + public RouterContext buildRouter(String nsId, String nnId) + throws URISyntaxException, IOException { + Configuration config = generateRouterConfiguration(nsId, nnId); + RouterContext rc = new RouterContext(config, nsId, nnId); + return rc; + } + + public void startCluster() { + startCluster(null); + } + + public void startCluster(Configuration overrideConf) { + try { + MiniDFSNNTopology topology = new MiniDFSNNTopology(); + for (String ns : nameservices) { + NSConf conf = new MiniDFSNNTopology.NSConf(ns); + if (highAvailability) { + for (int i=0; i<namenodes.size()/nameservices.size(); i++) { + NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i); + conf.addNN(nnConf); + } + } else { + NNConf nnConf = new MiniDFSNNTopology.NNConf(null); + conf.addNN(nnConf); + } + topology.addNameservice(conf); + } + topology.setFederation(true); + + // Start mini DFS cluster + String ns0 = nameservices.get(0); + Configuration nnConf = generateNamenodeConfiguration(ns0); + if (overrideConf != null) { + nnConf.addResource(overrideConf); + } + cluster = new MiniDFSCluster.Builder(nnConf) + .numDataNodes(nameservices.size() * numDatanodesPerNameservice) + .nnTopology(topology) + .build(); + cluster.waitActive(); + + // Store NN pointers + for (int i = 0; i < namenodes.size(); i++) { + NameNode nn = cluster.getNameNode(i); + namenodes.get(i).setNamenode(nn); + } + + } catch (Exception e) { + LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e); + if (cluster != null) { + cluster.shutdown(); + } + } + } + + public void startRouters() + throws InterruptedException, URISyntaxException, IOException { + + // Create one router per nameservice + this.routers = new ArrayList<>(); + for (String ns : this.nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + RouterContext router = buildRouter(ns, context.namenodeId); + this.routers.add(router); + } + } + + // Start all routers + for (RouterContext router : this.routers) { + router.router.start(); + } + + // Wait until all routers are active and record their ports + for (RouterContext router : this.routers) { + waitActive(router); + router.initRouter(); + } + } + + public void waitActive(NamenodeContext nn) throws IOException { + cluster.waitActive(nn.index); + } + + public void waitActive(RouterContext router) + throws InterruptedException { + for (int loopCount = 0; loopCount < 20; loopCount++) { + // Validate connection of routers to NNs + if (router.router.getServiceState() == STATE.STARTED) { + return; + } + Thread.sleep(1000); + } + fail("Timeout waiting for " + router.router + " to activate"); + } + + public void registerNamenodes() throws IOException { + for (RouterContext r : this.routers) { + ActiveNamenodeResolver resolver = r.router.getNamenodeResolver(); + for (NamenodeContext nn : this.namenodes) { + // Generate a report + NamenodeStatusReport report = new NamenodeStatusReport( + nn.nameserviceId, nn.namenodeId, + nn.getRpcAddress(), nn.getServiceAddress(), + nn.getLifelineAddress(), nn.getHttpAddress()); + FSImage fsImage = nn.namenode.getNamesystem().getFSImage(); + NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo(); + report.setNamespaceInfo(nsInfo); + + // Determine HA state from nn public state string + String nnState = nn.namenode.getState(); + HAServiceState haState = HAServiceState.ACTIVE; + for (HAServiceState state : HAServiceState.values()) { + if (nnState.equalsIgnoreCase(state.name())) { + haState = state; + break; + } + } + report.setHAServiceState(haState); + + // Register with the resolver + resolver.registerNamenode(report); + } + } + } + + public void waitNamenodeRegistration() throws Exception { + for (RouterContext r : this.routers) { + Router router = r.router; + for (NamenodeContext nn : this.namenodes) { + ActiveNamenodeResolver nnResolver = router.getNamenodeResolver(); + waitNamenodeRegistered( + nnResolver, nn.nameserviceId, nn.namenodeId, null); + } + } + } + + public void waitRouterRegistrationQuorum(RouterContext router, + FederationNamenodeServiceState state, String nsId, String nnId) + throws Exception { + LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state); + ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver(); + waitNamenodeRegistered(nnResolver, nsId, nnId, state); + } + + /** + * Get the federated path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Router. + */ + public String getFederatedPathForNS(String nsId) { + return "/" + nsId; + } + + /** + * Get the namenode path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Namenode. + */ + public String getNamenodePathForNS(String nsId) { + return "/target-" + nsId; + } + + /** + * Get the federated test directory for a nameservice. + * @param nsId Nameservice identifier. + * @return Example: + * <ul> + * <li>/ns0/testdir which maps to ns0->/target-ns0/testdir + * </ul> + */ + public String getFederatedTestDirectoryForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_DIR; + } + + /** + * Get the namenode test directory for a nameservice. + * @param nsId Nameservice identifier. + * @return example: + * <ul> + * <li>/target-ns0/testdir + * </ul> + */ + public String getNamenodeTestDirectoryForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_DIR; + } + + /** + * Get the federated test file for a nameservice. + * @param nsId Nameservice identifier. + * @return example: + * <ul> + * <li>/ns0/testfile which maps to ns0->/target-ns0/testfile + * </ul> + */ + public String getFederatedTestFileForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_FILE; + } + + /** + * Get the namenode test file for a nameservice. + * @param nsId Nameservice identifier. + * @return example: + * <ul> + * <li>/target-ns0/testfile + * </ul> + */ + public String getNamenodeTestFileForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_FILE; + } + + /** + * Switch a namenode in a nameservice to be the active. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToActive(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && + nn.getNamenodeId().equals(nnId)) { + cluster.transitionToActive(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to active", e); + } + } + + /** + * Switch a namenode in a nameservice to be in standby. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToStandby(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && + nn.getNamenodeId().equals(nnId)) { + cluster.transitionToStandby(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to standby", e); + } + } + + /** + * Stop the federated HDFS cluster. + */ + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + if (routers != null) { + for (RouterContext context : routers) { + stopRouter(context); + } + } + } + + /** + * Stop a router. + * @param router Router context. + */ + public void stopRouter(RouterContext router) { + try { + router.router.shutDown(); + + int loopCount = 0; + while (router.router.getServiceState() != STATE.STOPPED) { + loopCount++; + Thread.sleep(1000); + if (loopCount > 20) { + LOG.error("Cannot shutdown router {}", router.rpcPort); + break; + } + } + } catch (InterruptedException e) { + } + } + + ///////////////////////////////////////////////////////////////////////////// + // Namespace Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * Creates test directories via the namenode. + * 1) /target-ns0/testfile + * 2) /target-ns1/testfile + * @throws IOException + */ + public void createTestDirectoriesNamenode() throws IOException { + // Add a test dir to each NS and verify + for (String ns : getNameservices()) { + NamenodeContext context = getNamenode(ns, null); + if (!createTestDirectoriesNamenode(context)) { + throw new IOException("Cannot create test directory for ns " + ns); + } + } + } + + public boolean createTestDirectoriesNamenode(NamenodeContext nn) + throws IOException { + FileSystem fs = nn.getFileSystem(); + String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId); + return addDirectory(fs, testDir); + } + + public void deleteAllFiles() throws IOException { + // Delete all files via the NNs and verify + for (NamenodeContext context : getNamenodes()) { + FileSystem fs = context.getFileSystem(); + FileStatus[] status = fs.listStatus(new Path("/")); + for (int i = 0; i <status.length; i++) { + Path p = status[i].getPath(); + fs.delete(p, true); + } + status = fs.listStatus(new Path("/")); + assertEquals(status.length, 0); + } + } + + ///////////////////////////////////////////////////////////////////////////// + // MockRouterResolver Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * <ul> + * <li>/ -> [ns0->/]. + * <li>/nso -> ns0->/target-ns0. + * <li>/ns1 -> ns1->/target-ns1. + * </ul> + */ + public void installMockLocations() { + for (RouterContext r : routers) { + MockResolver resolver = + (MockResolver) r.router.getSubclusterResolver(); + // create table entries + for (String nsId : nameservices) { + // Direct path + String routerPath = getFederatedPathForNS(nsId); + String nnPath = getNamenodePathForNS(nsId); + resolver.addLocation(routerPath, nsId, nnPath); + } + + // Root path points to both first nameservice + String ns0 = nameservices.get(0); + resolver.addLocation("/", ns0, "/"); + } + } + + public MiniDFSCluster getCluster() { + return cluster; + } + + /** + * Wait until the federated cluster is up and ready. + * @throws IOException If we cannot wait for the cluster to be up. + */ + public void waitClusterUp() throws IOException { + cluster.waitClusterUp(); + registerNamenodes(); + try { + waitNamenodeRegistration(); + } catch (Exception e) { + throw new IOException("Cannot wait for the namenodes", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java new file mode 100644 index 0000000..aa1906f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * Test utility to mimic a federated HDFS cluster with a router and a state + * store. + */ +public class StateStoreDFSCluster extends RouterDFSCluster { + + private static final Class<?> DEFAULT_FILE_RESOLVER = + MountTableResolver.class; + private static final Class<?> DEFAULT_NAMENODE_RESOLVER = + MembershipNamenodeResolver.class; + + public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval) + throws IOException, InterruptedException { + this(ha, numNameservices, numNamenodes, heartbeatInterval, + cacheFlushInterval, DEFAULT_FILE_RESOLVER); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, int numNamenodes, + long heartbeatInterval, long cacheFlushInterval, Class<?> fileResolver) + throws IOException, InterruptedException { + super(ha, numNameservices, numNamenodes, heartbeatInterval, + cacheFlushInterval); + + // Attach state store and resolvers to router + Configuration stateStoreConfig = getStateStoreConfiguration(); + // Use state store backed resolvers + stateStoreConfig.setClass( + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + DEFAULT_NAMENODE_RESOLVER, ActiveNamenodeResolver.class); + stateStoreConfig.setClass( + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + fileResolver, FileSubclusterResolver.class); + this.addRouterOverrides(stateStoreConfig); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, + Class<?> fileResolver) throws IOException, InterruptedException { + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS, fileResolver); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices) + throws IOException, InterruptedException { + this(ha, numNameservices, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + public StateStoreDFSCluster(boolean ha, int numNameservices, + int numNamnodes) throws IOException, InterruptedException { + this(ha, numNameservices, numNamnodes, + DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS); + } + + ///////////////////////////////////////////////////////////////////////////// + // State Store Test Fixtures + ///////////////////////////////////////////////////////////////////////////// + + /** + * Adds test fixtures for NN registation for each NN nameservice -> NS + * namenode -> NN rpcAddress -> 0.0.0.0:0 webAddress -> 0.0.0.0:0 state -> + * STANDBY safeMode -> false blockPool -> test. + * + * @param stateStore State Store. + * @throws IOException If it cannot register. + */ + public void createTestRegistration(StateStoreService stateStore) + throws IOException { + List<MembershipState> entries = new ArrayList<MembershipState>(); + for (NamenodeContext nn : this.getNamenodes()) { + MembershipState entry = createMockRegistrationForNamenode( + nn.getNameserviceId(), nn.getNamenodeId(), + FederationNamenodeServiceState.STANDBY); + entries.add(entry); + } + synchronizeRecords( + stateStore, entries, MembershipState.class); + } + + public void createTestMountTable(StateStoreService stateStore) + throws IOException { + List<MountTable> mounts = generateMockMountTable(); + synchronizeRecords(stateStore, mounts, MountTable.class); + stateStore.refreshCaches(); + } + + public List<MountTable> generateMockMountTable() throws IOException { + // create table entries + List<MountTable> entries = new ArrayList<>(); + for (String ns : this.getNameservices()) { + Map<String, String> destMap = new HashMap<>(); + destMap.put(ns, getNamenodePathForNS(ns)); + + // Direct path + String fedPath = getFederatedPathForNS(ns); + MountTable entry = MountTable.newInstance(fedPath, destMap); + entries.add(entry); + } + + // Root path goes to nameservice 1 + Map<String, String> destMap = new HashMap<>(); + String ns0 = this.getNameservices().get(0); + destMap.put(ns0, "/"); + MountTable entry = MountTable.newInstance("/", destMap); + entries.add(entry); + return entries; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java new file mode 100644 index 0000000..94799f3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import javax.management.MalformedObjectNameException; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +/** + * Test the JMX interface for the {@link Router}. + */ +public class TestFederationMetrics extends TestMetricsBase { + + public static final String FEDERATION_BEAN = + "Hadoop:service=Router,name=FederationState"; + public static final String STATE_STORE_BEAN = + "Hadoop:service=Router,name=StateStore"; + public static final String RPC_BEAN = + "Hadoop:service=Router,name=FederationRPC"; + + @Test + public void testClusterStatsJMX() + throws MalformedObjectNameException, IOException { + + FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class); + validateClusterStatsBean(bean); + } + + @Test + public void testClusterStatsDataSource() throws IOException { + FederationMetrics metrics = getRouter().getMetrics(); + validateClusterStatsBean(metrics); + } + + @Test + public void testMountTableStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getMountTable(); + JSONArray jsonArray = new JSONArray(jsonString); + assertEquals(jsonArray.length(), getMockMountTable().size()); + + int match = 0; + for (int i = 0; i < jsonArray.length(); i++) { + JSONObject json = jsonArray.getJSONObject(i); + String src = json.getString("sourcePath"); + + for (MountTable entry : getMockMountTable()) { + if (entry.getSourcePath().equals(src)) { + assertEquals(entry.getDefaultLocation().getNameserviceId(), + json.getString("nameserviceId")); + assertEquals(entry.getDefaultLocation().getDest(), + json.getString("path")); + assertEquals(entry.getOwnerName(), json.getString("ownerName")); + assertEquals(entry.getGroupName(), json.getString("groupName")); + assertEquals(entry.getMode().toString(), json.getString("mode")); + assertEquals(entry.getQuota().toString(), json.getString("quota")); + assertNotNullAndNotEmpty(json.getString("dateCreated")); + assertNotNullAndNotEmpty(json.getString("dateModified")); + match++; + } + } + } + assertEquals(match, getMockMountTable().size()); + } + + private MembershipState findMockNamenode(String nsId, String nnId) { + + @SuppressWarnings("unchecked") + List<MembershipState> namenodes = + ListUtils.union(getActiveMemberships(), getStandbyMemberships()); + for (MembershipState nn : namenodes) { + if (nn.getNamenodeId().equals(nnId) + && nn.getNameserviceId().equals(nsId)) { + return nn; + } + } + return null; + } + + @Test + public void testNamenodeStatsDataSource() throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNamenodes(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator<?> keys = jsonObject.keys(); + int nnsFound = 0; + while (keys.hasNext()) { + // Validate each entry against our mocks + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + assertEquals(json.getString("state"), mockEntry.getState().toString()); + MembershipStats stats = mockEntry.getStats(); + assertEquals(json.getLong("numOfActiveDatanodes"), + stats.getNumOfActiveDatanodes()); + assertEquals(json.getLong("numOfDeadDatanodes"), + stats.getNumOfDeadDatanodes()); + assertEquals(json.getLong("numOfDecommissioningDatanodes"), + stats.getNumOfDecommissioningDatanodes()); + assertEquals(json.getLong("numOfDecomActiveDatanodes"), + stats.getNumOfDecomActiveDatanodes()); + assertEquals(json.getLong("numOfDecomDeadDatanodes"), + stats.getNumOfDecomDeadDatanodes()); + assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks()); + assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress()); + assertEquals(json.getString("webAddress"), mockEntry.getWebAddress()); + nnsFound++; + } + // Validate all memberships are present + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + nnsFound); + } + + @Test + public void testNameserviceStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNameservices(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator<?> keys = jsonObject.keys(); + int nameservicesFound = 0; + while (keys.hasNext()) { + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + // NS should report the active NN + assertEquals(mockEntry.getState().toString(), json.getString("state")); + assertEquals("ACTIVE", json.getString("state")); + + // Stats in the NS should reflect the stats for the most active NN + MembershipStats stats = mockEntry.getStats(); + assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles")); + assertEquals(stats.getTotalSpace(), json.getLong("totalSpace")); + assertEquals(stats.getAvailableSpace(), + json.getLong("availableSpace")); + assertEquals(stats.getNumOfBlocksMissing(), + json.getLong("numOfBlocksMissing")); + assertEquals(stats.getNumOfActiveDatanodes(), + json.getLong("numOfActiveDatanodes")); + assertEquals(stats.getNumOfDeadDatanodes(), + json.getLong("numOfDeadDatanodes")); + assertEquals(stats.getNumOfDecommissioningDatanodes(), + json.getLong("numOfDecommissioningDatanodes")); + assertEquals(stats.getNumOfDecomActiveDatanodes(), + json.getLong("numOfDecomActiveDatanodes")); + assertEquals(stats.getNumOfDecomDeadDatanodes(), + json.getLong("numOfDecomDeadDatanodes")); + assertEquals(stats.getProvidedSpace(), + json.getLong("providedSpace")); + nameservicesFound++; + } + assertEquals(getNameservices().size(), nameservicesFound); + } + + @Test + public void testRouterStatsDataSource() throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getRouters(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator<?> keys = jsonObject.keys(); + int routersFound = 0; + while (keys.hasNext()) { + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String address = json.getString("address"); + assertNotNullAndNotEmpty(address); + RouterState router = findMockRouter(address); + assertNotNull(router); + + assertEquals(router.getStatus().toString(), json.getString("status")); + assertEquals(router.getCompileInfo(), json.getString("compileInfo")); + assertEquals(router.getVersion(), json.getString("version")); + assertEquals(router.getDateStarted(), json.getLong("dateStarted")); + assertEquals(router.getDateCreated(), json.getLong("dateCreated")); + assertEquals(router.getDateModified(), json.getLong("dateModified")); + + StateStoreVersion version = router.getStateStoreVersion(); + assertEquals( + FederationMetrics.getDateString(version.getMembershipVersion()), + json.get("lastMembershipUpdate")); + assertEquals( + FederationMetrics.getDateString(version.getMountTableVersion()), + json.get("lastMountTableUpdate")); + assertEquals(version.getMembershipVersion(), + json.get("membershipVersion")); + assertEquals(version.getMountTableVersion(), + json.get("mountTableVersion")); + routersFound++; + } + + assertEquals(getMockRouters().size(), routersFound); + } + + private void assertNotNullAndNotEmpty(String field) { + assertNotNull(field); + assertTrue(field.length() > 0); + } + + private RouterState findMockRouter(String routerId) { + for (RouterState router : getMockRouters()) { + if (router.getAddress().equals(routerId)) { + return router; + } + } + return null; + } + + private void validateClusterStatsBean(FederationMBean bean) + throws IOException { + + // Determine aggregates + long numBlocks = 0; + long numLive = 0; + long numDead = 0; + long numDecom = 0; + long numDecomLive = 0; + long numDecomDead = 0; + long numFiles = 0; + for (MembershipState mock : getActiveMemberships()) { + MembershipStats stats = mock.getStats(); + numBlocks += stats.getNumOfBlocks(); + numLive += stats.getNumOfActiveDatanodes(); + numDead += stats.getNumOfDeadDatanodes(); + numDecom += stats.getNumOfDecommissioningDatanodes(); + numDecomLive += stats.getNumOfDecomActiveDatanodes(); + numDecomDead += stats.getNumOfDecomDeadDatanodes(); + } + + assertEquals(numBlocks, bean.getNumBlocks()); + assertEquals(numLive, bean.getNumLiveNodes()); + assertEquals(numDead, bean.getNumDeadNodes()); + assertEquals(numDecom, bean.getNumDecommissioningNodes()); + assertEquals(numDecomLive, bean.getNumDecomLiveNodes()); + assertEquals(numDecomDead, bean.getNumDecomDeadNodes()); + assertEquals(numFiles, bean.getNumFiles()); + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + bean.getNumNamenodes()); + assertEquals(getNameservices().size(), bean.getNumNameservices()); + assertTrue(bean.getVersion().length() > 0); + assertTrue(bean.getCompiledDate().length() > 0); + assertTrue(bean.getCompileInfo().length() > 0); + assertTrue(bean.getRouterStarted().length() > 0); + assertTrue(bean.getHostAndPort().length() > 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java new file mode 100644 index 0000000..2169b21 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.Before; + +/** + * Test the basic metrics functionality. + */ +public class TestMetricsBase { + + private StateStoreService stateStore; + private MembershipStore membershipStore; + private RouterStore routerStore; + private Router router; + private Configuration routerConfig; + + private List<MembershipState> activeMemberships; + private List<MembershipState> standbyMemberships; + private List<MountTable> mockMountTable; + private List<RouterState> mockRouters; + private List<String> nameservices; + + @Before + public void setupBase() throws Exception { + + if (router == null) { + routerConfig = new RouterConfigBuilder() + .stateStore() + .metrics() + .http() + .build(); + router = new Router(); + router.init(routerConfig); + router.setRouterId("routerId"); + router.start(); + stateStore = router.getStateStore(); + + membershipStore = + stateStore.getRegisteredRecordStore(MembershipStore.class); + routerStore = stateStore.getRegisteredRecordStore(RouterStore.class); + + // Read all data and load all caches + waitStateStore(stateStore, 10000); + createFixtures(); + stateStore.refreshCaches(true); + Thread.sleep(1000); + } + } + + @After + public void tearDownBase() throws IOException { + if (router != null) { + router.stop(); + router.close(); + router = null; + } + } + + private void createFixtures() throws IOException { + // Clear all records + clearAllRecords(stateStore); + + nameservices = new ArrayList<>(); + nameservices.add(NAMESERVICES[0]); + nameservices.add(NAMESERVICES[1]); + + // 2 NNs per NS + activeMemberships = new ArrayList<>(); + standbyMemberships = new ArrayList<>(); + + for (String nameservice : nameservices) { + MembershipState namenode1 = createMockRegistrationForNamenode( + nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE); + NamenodeHeartbeatRequest request1 = + NamenodeHeartbeatRequest.newInstance(namenode1); + assertTrue(membershipStore.namenodeHeartbeat(request1).getResult()); + activeMemberships.add(namenode1); + + MembershipState namenode2 = createMockRegistrationForNamenode( + nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY); + NamenodeHeartbeatRequest request2 = + NamenodeHeartbeatRequest.newInstance(namenode2); + assertTrue(membershipStore.namenodeHeartbeat(request2).getResult()); + standbyMemberships.add(namenode2); + } + + // Add 2 mount table memberships + mockMountTable = createMockMountTable(nameservices); + synchronizeRecords(stateStore, mockMountTable, MountTable.class); + + // Add 2 router memberships in addition to the running router. + long t1 = Time.now(); + mockRouters = new ArrayList<>(); + RouterState router1 = RouterState.newInstance( + "router1", t1, RouterServiceState.RUNNING); + router1.setStateStoreVersion(StateStoreVersion.newInstance( + t1 - 1000, t1 - 2000)); + RouterHeartbeatRequest heartbeatRequest = + RouterHeartbeatRequest.newInstance(router1); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus()); + + GetRouterRegistrationRequest getRequest = + GetRouterRegistrationRequest.newInstance("router1"); + GetRouterRegistrationResponse getResponse = + routerStore.getRouterRegistration(getRequest); + RouterState routerState1 = getResponse.getRouter(); + mockRouters.add(routerState1); + + long t2 = Time.now(); + RouterState router2 = RouterState.newInstance( + "router2", t2, RouterServiceState.RUNNING); + router2.setStateStoreVersion(StateStoreVersion.newInstance( + t2 - 6000, t2 - 7000)); + heartbeatRequest.setRouter(router2); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest).getStatus()); + getRequest.setRouterId("router2"); + getResponse = routerStore.getRouterRegistration(getRequest); + RouterState routerState2 = getResponse.getRouter(); + mockRouters.add(routerState2); + } + + protected Router getRouter() { + return router; + } + + protected List<MountTable> getMockMountTable() { + return mockMountTable; + } + + protected List<MembershipState> getActiveMemberships() { + return activeMemberships; + } + + protected List<MembershipState> getStandbyMemberships() { + return standbyMemberships; + } + + protected List<String> getNameservices() { + return nameservices; + } + + protected List<RouterState> getMockRouters() { + return mockRouters; + } + + protected StateStoreService getStateStore() { + return stateStore; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java new file mode 100644 index 0000000..cb3b472 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the {@link MountTableStore} from the {@link Router}. + */ +public class TestMountTableResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(TestMountTableResolver.class); + + private static final int TEST_MAX_CACHE_SIZE = 10; + + private MountTableResolver mountTable; + + private Map<String, String> getMountTableEntry( + String subcluster, String path) { + Map<String, String> ret = new HashMap<>(); + ret.put(subcluster, path); + return ret; + } + + /** + * Setup the mount table. + * / -> 1:/ + * __tmp -> 2:/tmp + * __user -> 3:/user + * ____a -> 2:/user/test + * ______demo + * ________test + * __________a -> 1:/user/test + * __________b -> 3:/user/test + * ____b + * ______file1.txt -> 4:/user/file1.txt + * __usr + * ____bin -> 2:/bin + * __readonly -> 2:/tmp + * + * @throws IOException If it cannot set the mount table. + */ + private void setupMountTable() throws IOException { + Configuration conf = new Configuration(); + conf.setInt( + FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE, TEST_MAX_CACHE_SIZE); + conf.setStrings(DFS_ROUTER_DEFAULT_NAMESERVICE, "0"); + mountTable = new MountTableResolver(conf); + + // Root mount point + Map<String, String> map = getMountTableEntry("1", "/"); + mountTable.addEntry(MountTable.newInstance("/", map)); + + // /tmp + map = getMountTableEntry("2", "/"); + mountTable.addEntry(MountTable.newInstance("/tmp", map)); + + // /user + map = getMountTableEntry("3", "/user"); + mountTable.addEntry(MountTable.newInstance("/user", map)); + + // /usr/bin + map = getMountTableEntry("2", "/bin"); + mountTable.addEntry(MountTable.newInstance("/usr/bin", map)); + + // /user/a + map = getMountTableEntry("2", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a", map)); + + // /user/b/file1.txt + map = getMountTableEntry("4", "/user/file1.txt"); + mountTable.addEntry(MountTable.newInstance("/user/b/file1.txt", map)); + + // /user/a/demo/test/a + map = getMountTableEntry("1", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/a", map)); + + // /user/a/demo/test/b + map = getMountTableEntry("3", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map)); + + // /readonly + map = getMountTableEntry("2", "/tmp"); + MountTable readOnlyEntry = MountTable.newInstance("/readonly", map); + readOnlyEntry.setReadOnly(true); + mountTable.addEntry(readOnlyEntry); + } + + @Before + public void setup() throws IOException { + setupMountTable(); + } + + @Test + public void testDestination() throws IOException { + + // Check files + assertEquals("1->/tesfile1.txt", + mountTable.getDestinationForPath("/tesfile1.txt").toString()); + + assertEquals("3->/user/testfile2.txt", + mountTable.getDestinationForPath("/user/testfile2.txt").toString()); + + assertEquals("2->/user/test/testfile3.txt", + mountTable.getDestinationForPath("/user/a/testfile3.txt").toString()); + + assertEquals("3->/user/b/testfile4.txt", + mountTable.getDestinationForPath("/user/b/testfile4.txt").toString()); + + assertEquals("1->/share/file5.txt", + mountTable.getDestinationForPath("/share/file5.txt").toString()); + + assertEquals("2->/bin/file7.txt", + mountTable.getDestinationForPath("/usr/bin/file7.txt").toString()); + + assertEquals("1->/usr/file8.txt", + mountTable.getDestinationForPath("/usr/file8.txt").toString()); + + assertEquals("2->/user/test/demo/file9.txt", + mountTable.getDestinationForPath("/user/a/demo/file9.txt").toString()); + + // Check folders + assertEquals("3->/user/testfolder", + mountTable.getDestinationForPath("/user/testfolder").toString()); + + assertEquals("2->/user/test/b", + mountTable.getDestinationForPath("/user/a/b").toString()); + + assertEquals("3->/user/test/a", + mountTable.getDestinationForPath("/user/test/a").toString()); + + assertEquals("2->/tmp/tesfile1.txt", + mountTable.getDestinationForPath("/readonly/tesfile1.txt").toString()); + + } + + private void compareLists(List<String> list1, String[] list2) { + assertEquals(list1.size(), list2.length); + for (String item : list2) { + assertTrue(list1.contains(item)); + } + } + + @Test + public void testGetMountPoint() throws IOException { + // Check get the mount table entry for a path + MountTable mtEntry; + mtEntry = mountTable.getMountPoint("/"); + assertTrue(mtEntry.getSourcePath().equals("/")); + + mtEntry = mountTable.getMountPoint("/user"); + assertTrue(mtEntry.getSourcePath().equals("/user")); + + mtEntry = mountTable.getMountPoint("/user/a"); + assertTrue(mtEntry.getSourcePath().equals("/user/a")); + + mtEntry = mountTable.getMountPoint("/user/a/"); + assertTrue(mtEntry.getSourcePath().equals("/user/a")); + + mtEntry = mountTable.getMountPoint("/user/a/11"); + assertTrue(mtEntry.getSourcePath().equals("/user/a")); + + mtEntry = mountTable.getMountPoint("/user/a1"); + assertTrue(mtEntry.getSourcePath().equals("/user")); + } + + @Test + public void testGetMountPoints() throws IOException { + + // Check getting all mount points (virtual and real) beneath a path + List<String> mounts = mountTable.getMountPoints("/"); + assertEquals(4, mounts.size()); + compareLists(mounts, new String[] {"tmp", "user", "usr", "readonly"}); + + mounts = mountTable.getMountPoints("/user"); + assertEquals(2, mounts.size()); + compareLists(mounts, new String[] {"a", "b"}); + + mounts = mountTable.getMountPoints("/user/a"); + assertEquals(1, mounts.size()); + compareLists(mounts, new String[] {"demo"}); + + mounts = mountTable.getMountPoints("/user/a/demo"); + assertEquals(1, mounts.size()); + compareLists(mounts, new String[] {"test"}); + + mounts = mountTable.getMountPoints("/user/a/demo/test"); + assertEquals(2, mounts.size()); + compareLists(mounts, new String[] {"a", "b"}); + + mounts = mountTable.getMountPoints("/tmp"); + assertEquals(0, mounts.size()); + + mounts = mountTable.getMountPoints("/t"); + assertNull(mounts); + + mounts = mountTable.getMountPoints("/unknownpath"); + assertNull(mounts); + } + + private void compareRecords(List<MountTable> list1, String[] list2) { + assertEquals(list1.size(), list2.length); + for (String item : list2) { + for (MountTable record : list1) { + if (record.getSourcePath().equals(item)) { + return; + } + } + } + fail(); + } + + @Test + public void testGetMounts() throws IOException { + + // Check listing the mount table records at or beneath a path + List<MountTable> records = mountTable.getMounts("/"); + assertEquals(9, records.size()); + compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin", + "user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt", + "readonly"}); + + records = mountTable.getMounts("/user"); + assertEquals(5, records.size()); + compareRecords(records, new String[] {"/user", "/user/a/demo/a", + "/user/a/demo/b", "user/a", "/user/b/file1.txt"}); + + records = mountTable.getMounts("/user/a"); + assertEquals(3, records.size()); + compareRecords(records, + new String[] {"/user/a/demo/a", "/user/a/demo/b", "/user/a"}); + + records = mountTable.getMounts("/tmp"); + assertEquals(1, records.size()); + compareRecords(records, new String[] {"/tmp"}); + + records = mountTable.getMounts("/readonly"); + assertEquals(1, records.size()); + compareRecords(records, new String[] {"/readonly"}); + assertTrue(records.get(0).isReadOnly()); + } + + @Test + public void testRemoveSubTree() + throws UnsupportedOperationException, IOException { + + // 3 mount points are present /tmp, /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp", "readonly"}); + + // /tmp currently points to namespace 2 + assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt") + .getDefaultLocation().getNameserviceId()); + + // Remove tmp + mountTable.removeEntry("/tmp"); + + // Now 2 mount points are present /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "readonly"}); + + // /tmp no longer exists, uses default namespace for mapping / + assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt") + .getDefaultLocation().getNameserviceId()); + } + + @Test + public void testRemoveVirtualNode() + throws UnsupportedOperationException, IOException { + + // 3 mount points are present /tmp, /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp", "readonly"}); + + // /usr is virtual, uses namespace 1->/ + assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt") + .getDefaultLocation().getNameserviceId()); + + // Attempt to remove /usr + mountTable.removeEntry("/usr"); + + // Verify the remove failed + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp", "readonly"}); + } + + @Test + public void testRemoveLeafNode() + throws UnsupportedOperationException, IOException { + + // /user/a/demo/test/a currently points to namespace 1 + assertEquals("1", mountTable.getDestinationForPath("/user/a/demo/test/a") + .getDefaultLocation().getNameserviceId()); + + // Remove /user/a/demo/test/a + mountTable.removeEntry("/user/a/demo/test/a"); + + // Now /user/a/demo/test/a points to namespace 2 using the entry for /user/a + assertEquals("2", mountTable.getDestinationForPath("/user/a/demo/test/a") + .getDefaultLocation().getNameserviceId()); + + // Verify the virtual node at /user/a/demo still exists and was not deleted + compareLists(mountTable.getMountPoints("/user/a"), new String[] {"demo"}); + + // Verify the sibling node was unaffected and still points to ns 3 + assertEquals("3", mountTable.getDestinationForPath("/user/a/demo/test/b") + .getDefaultLocation().getNameserviceId()); + } + + @Test + public void testRefreshEntries() + throws UnsupportedOperationException, IOException { + + // Initial table loaded + testDestination(); + assertEquals(9, mountTable.getMounts("/").size()); + + // Replace table with /1 and /2 + List<MountTable> records = new ArrayList<>(); + Map<String, String> map1 = getMountTableEntry("1", "/"); + records.add(MountTable.newInstance("/1", map1)); + Map<String, String> map2 = getMountTableEntry("2", "/"); + records.add(MountTable.newInstance("/2", map2)); + mountTable.refreshEntries(records); + + // Verify addition + PathLocation destination1 = mountTable.getDestinationForPath("/1"); + RemoteLocation defaultLoc1 = destination1.getDefaultLocation(); + assertEquals("1", defaultLoc1.getNameserviceId()); + + PathLocation destination2 = mountTable.getDestinationForPath("/2"); + RemoteLocation defaultLoc2 = destination2.getDefaultLocation(); + assertEquals("2", defaultLoc2.getNameserviceId()); + + // Verify existing entries were removed + assertEquals(2, mountTable.getMounts("/").size()); + boolean assertionThrown = false; + try { + testDestination(); + fail(); + } catch (AssertionError e) { + // The / entry was removed, so it triggers an exception + assertionThrown = true; + } + assertTrue(assertionThrown); + } + + @Test + public void testMountTableScalability() throws IOException { + + List<MountTable> emptyList = new ArrayList<>(); + mountTable.refreshEntries(emptyList); + + // Add 100,000 entries in flat list + for (int i = 0; i < 100000; i++) { + Map<String, String> map = getMountTableEntry("1", "/" + i); + MountTable record = MountTable.newInstance("/" + i, map); + mountTable.addEntry(record); + if (i % 10000 == 0) { + LOG.info("Adding flat mount record {}: {}", i, record); + } + } + + assertEquals(100000, mountTable.getMountPoints("/").size()); + assertEquals(100000, mountTable.getMounts("/").size()); + + // Add 1000 entries in deep list + mountTable.refreshEntries(emptyList); + String parent = "/"; + for (int i = 0; i < 1000; i++) { + final int index = i; + Map<String, String> map = getMountTableEntry("1", "/" + index); + if (i > 0) { + parent = parent + "/"; + } + parent = parent + i; + MountTable record = MountTable.newInstance(parent, map); + mountTable.addEntry(record); + } + + assertEquals(1, mountTable.getMountPoints("/").size()); + assertEquals(1000, mountTable.getMounts("/").size()); + + // Add 100,000 entries in deep and wide tree + mountTable.refreshEntries(emptyList); + Random rand = new Random(); + parent = "/" + Integer.toString(rand.nextInt()); + int numRootTrees = 1; + for (int i = 0; i < 100000; i++) { + final int index = i; + Map<String, String> map = getMountTableEntry("1", "/" + index); + parent = parent + "/" + i; + if (parent.length() > 2000) { + // Start new tree + parent = "/" + Integer.toString(rand.nextInt()); + numRootTrees++; + } + MountTable record = MountTable.newInstance(parent, map); + mountTable.addEntry(record); + } + + assertEquals(numRootTrees, mountTable.getMountPoints("/").size()); + assertEquals(100000, mountTable.getMounts("/").size()); + } + + @Test + public void testUpdate() throws IOException { + + // Add entry to update later + Map<String, String> map = getMountTableEntry("1", "/"); + mountTable.addEntry(MountTable.newInstance("/testupdate", map)); + + MountTable entry = mountTable.getMountPoint("/testupdate"); + List<RemoteLocation> dests = entry.getDestinations(); + assertEquals(1, dests.size()); + RemoteLocation dest = dests.get(0); + assertEquals("1", dest.getNameserviceId()); + + // Update entry + Collection<MountTable> entries = Collections.singletonList( + MountTable.newInstance("/testupdate", getMountTableEntry("2", "/"))); + mountTable.refreshEntries(entries); + + MountTable entry1 = mountTable.getMountPoint("/testupdate"); + List<RemoteLocation> dests1 = entry1.getDestinations(); + assertEquals(1, dests1.size()); + RemoteLocation dest1 = dests1.get(0); + assertEquals("2", dest1.getNameserviceId()); + + // Remove the entry to test updates and check + mountTable.removeEntry("/testupdate"); + MountTable entry2 = mountTable.getMountPoint("/testupdate"); + assertNull(entry2); + } + + @Test + public void testCacheCleaning() throws Exception { + for (int i = 0; i < 1000; i++) { + String filename = String.format("/user/a/file-%04d.txt", i); + mountTable.getDestinationForPath(filename); + } + long cacheSize = mountTable.getCacheSize(); + assertTrue(cacheSize <= TEST_MAX_CACHE_SIZE); + } + + @Test + public void testLocationCache() throws Exception { + List<MountTable> entries = new ArrayList<>(); + + // Add entry and test location cache + Map<String, String> map1 = getMountTableEntry("1", "/testlocationcache"); + MountTable entry1 = MountTable.newInstance("/testlocationcache", map1); + entries.add(entry1); + + Map<String, String> map2 = getMountTableEntry("2", + "/anothertestlocationcache"); + MountTable entry2 = MountTable.newInstance("/anothertestlocationcache", + map2); + entries.add(entry2); + mountTable.refreshEntries(entries); + assertEquals("1->/testlocationcache", + mountTable.getDestinationForPath("/testlocationcache").toString()); + assertEquals("2->/anothertestlocationcache", + mountTable.getDestinationForPath("/anothertestlocationcache") + .toString()); + + // Remove the entry1 + entries.remove(entry1); + mountTable.refreshEntries(entries); + + // Add the default location and test location cache + assertEquals("0->/testlocationcache", + mountTable.getDestinationForPath("/testlocationcache").toString()); + + // Add the entry again but mount to another ns + Map<String, String> map3 = getMountTableEntry("3", "/testlocationcache"); + MountTable entry3 = MountTable.newInstance("/testlocationcache", map3); + entries.add(entry3); + mountTable.refreshEntries(entries); + + // Ensure location cache update correctly + assertEquals("3->/testlocationcache", + mountTable.getDestinationForPath("/testlocationcache").toString()); + + // Cleanup before exit + mountTable.removeEntry("/testlocationcache"); + mountTable.removeEntry("/anothertestlocationcache"); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org