http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index ee6f57d..2875750 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time; /** * In-memory cache/mock of a namenode and file resolver. Stores the most - * recently updated NN information for each nameservice and block pool. Also + * recently updated NN information for each nameservice and block pool. It also * stores a virtual mount table for resolving global namespace paths to local NN * paths. */ @@ -51,82 +51,93 @@ public class MockResolver implements ActiveNamenodeResolver, FileSubclusterResolver { private Map<String, List<? extends FederationNamenodeContext>> resolver = - new HashMap<String, List<? extends FederationNamenodeContext>>(); - private Map<String, List<RemoteLocation>> locations = - new HashMap<String, List<RemoteLocation>>(); - private Set<FederationNamespaceInfo> namespaces = - new HashSet<FederationNamespaceInfo>(); + new HashMap<>(); + private Map<String, List<RemoteLocation>> locations = new HashMap<>(); + private Set<FederationNamespaceInfo> namespaces = new HashSet<>(); private String defaultNamespace = null; + public MockResolver(Configuration conf, StateStoreService store) { this.cleanRegistrations(); } - public void addLocation(String mount, String nameservice, String location) { - RemoteLocation remoteLocation = new RemoteLocation(nameservice, location); - List<RemoteLocation> locationsList = locations.get(mount); + public void addLocation(String mount, String nsId, String location) { + List<RemoteLocation> locationsList = this.locations.get(mount); if (locationsList == null) { - locationsList = new LinkedList<RemoteLocation>(); - locations.put(mount, locationsList); + locationsList = new LinkedList<>(); + this.locations.put(mount, locationsList); } + + final RemoteLocation remoteLocation = new RemoteLocation(nsId, location); if (!locationsList.contains(remoteLocation)) { locationsList.add(remoteLocation); } if (this.defaultNamespace == null) { - this.defaultNamespace = nameservice; + this.defaultNamespace = nsId; } } public synchronized void cleanRegistrations() { - this.resolver = - new HashMap<String, List<? extends FederationNamenodeContext>>(); - this.namespaces = new HashSet<FederationNamespaceInfo>(); + this.resolver = new HashMap<>(); + this.namespaces = new HashSet<>(); } @Override public void updateActiveNamenode( - String ns, InetSocketAddress successfulAddress) { + String nsId, InetSocketAddress successfulAddress) { String address = successfulAddress.getHostName() + ":" + successfulAddress.getPort(); - String key = ns; + String key = nsId; if (key != null) { // Update the active entry @SuppressWarnings("unchecked") - List<FederationNamenodeContext> iterator = - (List<FederationNamenodeContext>) resolver.get(key); - for (FederationNamenodeContext namenode : iterator) { + List<FederationNamenodeContext> namenodes = + (List<FederationNamenodeContext>) this.resolver.get(key); + for (FederationNamenodeContext namenode : namenodes) { if (namenode.getRpcAddress().equals(address)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; nn.setState(FederationNamenodeServiceState.ACTIVE); break; } } - Collections.sort(iterator, new NamenodePriorityComparator()); + // This operation modifies the list so we need to be careful + synchronized(namenodes) { + Collections.sort(namenodes, new NamenodePriorityComparator()); + } } } @Override public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(String nameserviceId) { - return resolver.get(nameserviceId); + // Return a copy of the list because it is updated periodically + List<? extends FederationNamenodeContext> namenodes = + this.resolver.get(nameserviceId); + return Collections.unmodifiableList(new ArrayList<>(namenodes)); } @Override public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( String blockPoolId) { - return resolver.get(blockPoolId); + // Return a copy of the list because it is updated periodically + List<? extends FederationNamenodeContext> namenodes = + this.resolver.get(blockPoolId); + return Collections.unmodifiableList(new ArrayList<>(namenodes)); } private static class MockNamenodeContext implements FederationNamenodeContext { + + private String namenodeId; + private String nameserviceId; + private String webAddress; private String rpcAddress; private String serviceAddress; private String lifelineAddress; - private String namenodeId; - private String nameserviceId; + private FederationNamenodeServiceState state; private long dateModified; @@ -197,6 +208,7 @@ public class MockResolver @Override public synchronized boolean registerNamenode(NamenodeStatusReport report) throws IOException { + MockNamenodeContext context = new MockNamenodeContext( report.getRpcAddress(), report.getServiceAddress(), report.getLifelineAddress(), report.getWebAddress(), @@ -205,13 +217,14 @@ public class MockResolver String nsId = report.getNameserviceId(); String bpId = report.getBlockPoolId(); String cId = report.getClusterId(); + @SuppressWarnings("unchecked") List<MockNamenodeContext> existingItems = - (List<MockNamenodeContext>) resolver.get(nsId); + (List<MockNamenodeContext>) this.resolver.get(nsId); if (existingItems == null) { - existingItems = new ArrayList<MockNamenodeContext>(); - resolver.put(bpId, existingItems); - resolver.put(nsId, existingItems); + existingItems = new ArrayList<>(); + this.resolver.put(bpId, existingItems); + this.resolver.put(nsId, existingItems); } boolean added = false; for (int i=0; i<existingItems.size() && !added; i++) { @@ -227,7 +240,7 @@ public class MockResolver Collections.sort(existingItems, new NamenodePriorityComparator()); FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId); - namespaces.add(info); + this.namespaces.add(info); return true; } @@ -238,16 +251,13 @@ public class MockResolver @Override public PathLocation getDestinationForPath(String path) throws IOException { - String finalPath = null; - String nameservice = null; - Set<String> namespaceSet = new HashSet<String>(); - LinkedList<RemoteLocation> remoteLocations = - new LinkedList<RemoteLocation>(); - for(String key : this.locations.keySet()) { - if(path.startsWith(key)) { + Set<String> namespaceSet = new HashSet<>(); + List<RemoteLocation> remoteLocations = new LinkedList<>(); + for (String key : this.locations.keySet()) { + if (path.startsWith(key)) { for (RemoteLocation location : this.locations.get(key)) { - finalPath = location.getDest() + path.substring(key.length()); - nameservice = location.getNameserviceId(); + String finalPath = location.getDest() + path.substring(key.length()); + String nameservice = location.getNameserviceId(); RemoteLocation remoteLocation = new RemoteLocation(nameservice, finalPath); remoteLocations.add(remoteLocation); @@ -265,7 +275,7 @@ public class MockResolver @Override public List<String> getMountPoints(String path) throws IOException { - List<String> mounts = new ArrayList<String>(); + List<String> mounts = new ArrayList<>(); if (path.equals("/")) { // Mounts only supported under root level for (String mount : this.locations.keySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 16d624c..39fcf7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; /** * Constructs a router configuration with individual features enabled/disabled. @@ -26,15 +27,32 @@ public class RouterConfigBuilder { private Configuration conf; + private boolean enableRpcServer = false; + public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; } public RouterConfigBuilder() { - this.conf = new Configuration(); + this.conf = new Configuration(false); + } + + public RouterConfigBuilder all() { + this.enableRpcServer = true; + return this; + } + + public RouterConfigBuilder rpc(boolean enable) { + this.enableRpcServer = enable; + return this; + } + + public RouterConfigBuilder rpc() { + return this.rpc(true); } public Configuration build() { + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer); return conf; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 55d04ad..4031b7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -17,27 +17,44 @@ */ package org.apache.hadoop.hdfs.server.federation; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -46,16 +63,49 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Test utility to mimic a federated HDFS cluster with a router. + * Test utility to mimic a federated HDFS cluster with multiple routers. */ public class RouterDFSCluster { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterDFSCluster.class); + + public static final String TEST_STRING = "teststring"; + public static final String TEST_DIR = "testdir"; + public static final String TEST_FILE = "testfile"; + + + /** Nameservices in the federated cluster. */ + private List<String> nameservices; + /** Namenodes in the federated cluster. */ + private List<NamenodeContext> namenodes; + /** Routers in the federated cluster. */ + private List<RouterContext> routers; + /** If the Namenodes are in high availability.*/ + private boolean highAvailability; + + /** Mini cluster. */ + private MiniDFSCluster cluster; + + /** Router configuration overrides. */ + private Configuration routerOverrides; + /** Namenode configuration overrides. */ + private Configuration namenodeOverrides; + + /** * Router context. */ @@ -69,13 +119,14 @@ public class RouterDFSCluster { private Configuration conf; private URI fileSystemUri; - public RouterContext(Configuration conf, String ns, String nn) + public RouterContext(Configuration conf, String nsId, String nnId) throws URISyntaxException { - this.namenodeId = nn; - this.nameserviceId = ns; this.conf = conf; - router = new Router(); - router.init(conf); + this.nameserviceId = nsId; + this.namenodeId = nnId; + + this.router = new Router(); + this.router.init(conf); } public Router getRouter() { @@ -99,18 +150,30 @@ public class RouterDFSCluster { } public void initRouter() throws URISyntaxException { + // Store the bound points for the router interfaces + InetSocketAddress rpcAddress = router.getRpcServerAddress(); + if (rpcAddress != null) { + this.rpcPort = rpcAddress.getPort(); + this.fileSystemUri = + URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress)); + // Override the default FS to point to the router RPC + DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + try { + this.fileContext = FileContext.getFileContext(conf); + } catch (UnsupportedFileSystemException e) { + this.fileContext = null; + } + } } - public DistributedFileSystem getFileSystem() throws IOException { - DistributedFileSystem fs = - (DistributedFileSystem) DistributedFileSystem.get(conf); - return fs; + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); } public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { - LOG.info("Connecting to router at " + fileSystemUri); + LOG.info("Connecting to router at {}", fileSystemUri); return user.doAs(new PrivilegedExceptionAction<DFSClient>() { @Override public DFSClient run() throws IOException { @@ -120,9 +183,8 @@ public class RouterDFSCluster { } public DFSClient getClient() throws IOException, URISyntaxException { - if (client == null) { - LOG.info("Connecting to router at " + fileSystemUri); + LOG.info("Connecting to router at {}", fileSystemUri); client = new DFSClient(fileSystemUri, conf); } return client; @@ -130,9 +192,10 @@ public class RouterDFSCluster { } /** - * Namenode context. + * Namenode context in the federated cluster. */ public class NamenodeContext { + private Configuration conf; private NameNode namenode; private String nameserviceId; private String namenodeId; @@ -143,14 +206,13 @@ public class RouterDFSCluster { private int httpPort; private URI fileSystemUri; private int index; - private Configuration conf; private DFSClient client; - public NamenodeContext(Configuration conf, String ns, String nn, - int index) { + public NamenodeContext( + Configuration conf, String nsId, String nnId, int index) { this.conf = conf; - this.namenodeId = nn; - this.nameserviceId = ns; + this.nameserviceId = nsId; + this.namenodeId = nnId; this.index = index; } @@ -170,20 +232,19 @@ public class RouterDFSCluster { return this.fileContext; } - public void setNamenode(NameNode n) throws URISyntaxException { - namenode = n; + public void setNamenode(NameNode nn) throws URISyntaxException { + this.namenode = nn; - // Store the bound ports and override the default FS with the local NN's - // RPC - rpcPort = n.getNameNodeAddress().getPort(); - servicePort = n.getServiceRpcAddress().getPort(); - lifelinePort = n.getServiceRpcAddress().getPort(); - httpPort = n.getHttpAddress().getPort(); - fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); - DistributedFileSystem.setDefaultUri(conf, fileSystemUri); + // Store the bound ports and override the default FS with the local NN RPC + this.rpcPort = nn.getNameNodeAddress().getPort(); + this.servicePort = nn.getServiceRpcAddress().getPort(); + this.lifelinePort = nn.getServiceRpcAddress().getPort(); + this.httpPort = nn.getHttpAddress().getPort(); + this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort()); + DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri); try { - this.fileContext = FileContext.getFileContext(conf); + this.fileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException e) { this.fileContext = null; } @@ -205,10 +266,8 @@ public class RouterDFSCluster { return namenode.getHttpAddress().getHostName() + ":" + httpPort; } - public DistributedFileSystem getFileSystem() throws IOException { - DistributedFileSystem fs = - (DistributedFileSystem) DistributedFileSystem.get(conf); - return fs; + public FileSystem getFileSystem() throws IOException { + return DistributedFileSystem.get(conf); } public void resetClient() { @@ -218,7 +277,7 @@ public class RouterDFSCluster { public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { - LOG.info("Connecting to namenode at " + fileSystemUri); + LOG.info("Connecting to namenode at {}", fileSystemUri); return user.doAs(new PrivilegedExceptionAction<DFSClient>() { @Override public DFSClient run() throws IOException { @@ -229,7 +288,7 @@ public class RouterDFSCluster { public DFSClient getClient() throws IOException, URISyntaxException { if (client == null) { - LOG.info("Connecting to namenode at " + fileSystemUri); + LOG.info("Connecting to namenode at {}", fileSystemUri); client = new DFSClient(fileSystemUri, conf); } return client; @@ -244,36 +303,20 @@ public class RouterDFSCluster { } } - public static final String NAMENODE1 = "nn0"; - public static final String NAMENODE2 = "nn1"; - public static final String NAMENODE3 = "nn2"; - public static final String TEST_STRING = "teststring"; - public static final String TEST_DIR = "testdir"; - public static final String TEST_FILE = "testfile"; - - private List<String> nameservices; - private List<RouterContext> routers; - private List<NamenodeContext> namenodes; - private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class); - private MiniDFSCluster cluster; - private boolean highAvailability; - - protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5; - protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5; - private Configuration routerOverrides; - private Configuration namenodeOverrides; - - private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2; - - public RouterDFSCluster(boolean ha, int numNameservices) { - this(ha, numNameservices, 2); - } - public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) { this.highAvailability = ha; configureNameservices(numNameservices, numNamenodes); } + public RouterDFSCluster(boolean ha, int numNameservices) { + this(ha, numNameservices, 2); + } + + /** + * Add configuration settings to override default Router settings. + * + * @param conf Router configuration overrides. + */ public void addRouterOverrides(Configuration conf) { if (this.routerOverrides == null) { this.routerOverrides = conf; @@ -282,6 +325,11 @@ public class RouterDFSCluster { } } + /** + * Add configuration settings to override default Namenode settings. + * + * @param conf Namenode configuration overrides. + */ public void addNamenodeOverrides(Configuration conf) { if (this.namenodeOverrides == null) { this.namenodeOverrides = conf; @@ -290,124 +338,134 @@ public class RouterDFSCluster { } } - public Configuration generateNamenodeConfiguration( - String defaultNameserviceId) { - Configuration c = new HdfsConfiguration(); + /** + * Generate the configuration for a client. + * + * @param nsId Nameservice identifier. + * @return New namenode configuration. + */ + public Configuration generateNamenodeConfiguration(String nsId) { + Configuration conf = new HdfsConfiguration(); - c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey()); - c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId); + conf.set(DFS_NAMESERVICES, getNameservicesKey()); + conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId); for (String ns : nameservices) { if (highAvailability) { - c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES); + conf.set( + DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, + NAMENODES[0] + "," + NAMENODES[1]); } for (NamenodeContext context : getNamenodes(ns)) { String suffix = context.getConfSuffix(); - c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.rpcPort); - c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.httpPort); - c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, + conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, "0.0.0.0"); } } - if (namenodeOverrides != null) { - c.addResource(namenodeOverrides); + if (this.namenodeOverrides != null) { + conf.addResource(this.namenodeOverrides); } - return c; + return conf; } + /** + * Generate the configuration for a client. + * + * @return New configuration for a client. + */ public Configuration generateClientConfiguration() { - Configuration conf = new HdfsConfiguration(); - conf.addResource(generateNamenodeConfiguration(getNameservices().get(0))); + Configuration conf = new HdfsConfiguration(false); + String ns0 = getNameservices().get(0); + conf.addResource(generateNamenodeConfiguration(ns0)); return conf; } - public Configuration generateRouterConfiguration(String localNameserviceId, - String localNamenodeId) throws IOException { - Configuration conf = new HdfsConfiguration(); - conf.addResource(generateNamenodeConfiguration(localNameserviceId)); + /** + * Generate the configuration for a Router. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + * @return New configuration for a Router. + */ + public Configuration generateRouterConfiguration(String nsId, String nnId) { + + Configuration conf = new HdfsConfiguration(false); + conf.addResource(generateNamenodeConfiguration(nsId)); + + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10); + conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); // Use mock resolver classes - conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); - conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); + conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); // Set the nameservice ID for the default NN monitor - conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId); - - if (localNamenodeId != null) { - conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId); + conf.set(DFS_NAMESERVICE_ID, nsId); + if (nnId != null) { + conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); } - StringBuilder routerBuilder = new StringBuilder(); - for (String ns : nameservices) { - for (NamenodeContext context : getNamenodes(ns)) { - String suffix = context.getConfSuffix(); - - if (routerBuilder.length() != 0) { - routerBuilder.append(","); - } - routerBuilder.append(suffix); + // Add custom overrides if available + if (this.routerOverrides != null) { + for (Entry<String, String> entry : this.routerOverrides) { + String confKey = entry.getKey(); + String confValue = entry.getValue(); + conf.set(confKey, confValue); } } - return conf; } public void configureNameservices(int numNameservices, int numNamenodes) { - nameservices = new ArrayList<String>(); - for (int i = 0; i < numNameservices; i++) { - nameservices.add("ns" + i); - } - namenodes = new ArrayList<NamenodeContext>(); - int index = 0; - for (String ns : nameservices) { - Configuration nnConf = generateNamenodeConfiguration(ns); - if (highAvailability) { - NamenodeContext context = - new NamenodeContext(nnConf, ns, NAMENODE1, index); - namenodes.add(context); - index++; - - if (numNamenodes > 1) { - context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1); - namenodes.add(context); - index++; - } + this.nameservices = new ArrayList<>(); + this.namenodes = new ArrayList<>(); - if (numNamenodes > 2) { - context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1); - namenodes.add(context); - index++; - } + NamenodeContext context = null; + int nnIndex = 0; + for (int i=0; i<numNameservices; i++) { + String ns = "ns" + i; + this.nameservices.add("ns" + i); + Configuration nnConf = generateNamenodeConfiguration(ns); + if (!highAvailability) { + context = new NamenodeContext(nnConf, ns, null, nnIndex++); + this.namenodes.add(context); } else { - NamenodeContext context = new NamenodeContext(nnConf, ns, null, index); - namenodes.add(context); - index++; + for (int j=0; j<numNamenodes; j++) { + context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++); + this.namenodes.add(context); + } } } } public String getNameservicesKey() { - StringBuilder ns = new StringBuilder(); - for (int i = 0; i < nameservices.size(); i++) { - if (i > 0) { - ns.append(","); + StringBuilder sb = new StringBuilder(); + for (String nsId : this.nameservices) { + if (sb.length() > 0) { + sb.append(","); } - ns.append(nameservices.get(i)); + sb.append(nsId); } - return ns.toString(); + return sb.toString(); } public String getRandomNameservice() { Random r = new Random(); - return nameservices.get(r.nextInt(nameservices.size())); + int randIndex = r.nextInt(nameservices.size()); + return nameservices.get(randIndex); } public List<String> getNameservices() { @@ -415,7 +473,7 @@ public class RouterDFSCluster { } public List<NamenodeContext> getNamenodes(String nameservice) { - ArrayList<NamenodeContext> nns = new ArrayList<NamenodeContext>(); + List<NamenodeContext> nns = new ArrayList<>(); for (NamenodeContext c : namenodes) { if (c.nameserviceId.equals(nameservice)) { nns.add(c); @@ -426,23 +484,23 @@ public class RouterDFSCluster { public NamenodeContext getRandomNamenode() { Random rand = new Random(); - return namenodes.get(rand.nextInt(namenodes.size())); + int i = rand.nextInt(this.namenodes.size()); + return this.namenodes.get(i); } public List<NamenodeContext> getNamenodes() { - return namenodes; + return this.namenodes; } public boolean isHighAvailability() { return highAvailability; } - public NamenodeContext getNamenode(String nameservice, - String namenode) { - for (NamenodeContext c : namenodes) { + public NamenodeContext getNamenode(String nameservice, String namenode) { + for (NamenodeContext c : this.namenodes) { if (c.nameserviceId.equals(nameservice)) { - if (namenode == null || c.namenodeId == null || namenode.isEmpty() - || c.namenodeId.isEmpty()) { + if (namenode == null || namenode.isEmpty() || + c.namenodeId == null || c.namenodeId.isEmpty()) { return c; } else if (c.namenodeId.equals(namenode)) { return c; @@ -453,7 +511,7 @@ public class RouterDFSCluster { } public List<RouterContext> getRouters(String nameservice) { - ArrayList<RouterContext> nns = new ArrayList<RouterContext>(); + List<RouterContext> nns = new ArrayList<>(); for (RouterContext c : routers) { if (c.nameserviceId.equals(nameservice)) { nns.add(c); @@ -462,14 +520,13 @@ public class RouterDFSCluster { return nns; } - public RouterContext getRouterContext(String nameservice, - String namenode) { + public RouterContext getRouterContext(String nsId, String nnId) { for (RouterContext c : routers) { - if (namenode == null) { + if (nnId == null) { return c; } - if (c.namenodeId.equals(namenode) - && c.nameserviceId.equals(nameservice)) { + if (c.namenodeId.equals(nnId) && + c.nameserviceId.equals(nsId)) { return c; } } @@ -485,10 +542,10 @@ public class RouterDFSCluster { return routers; } - public RouterContext buildRouter(String nameservice, String namenode) + public RouterContext buildRouter(String nsId, String nnId) throws URISyntaxException, IOException { - Configuration config = generateRouterConfiguration(nameservice, namenode); - RouterContext rc = new RouterContext(config, nameservice, namenode); + Configuration config = generateRouterConfiguration(nsId, nnId); + RouterContext rc = new RouterContext(config, nsId, nnId); return rc; } @@ -500,10 +557,9 @@ public class RouterDFSCluster { try { MiniDFSNNTopology topology = new MiniDFSNNTopology(); for (String ns : nameservices) { - NSConf conf = new MiniDFSNNTopology.NSConf(ns); if (highAvailability) { - for(int i = 0; i < namenodes.size()/nameservices.size(); i++) { + for (int i=0; i<namenodes.size()/nameservices.size(); i++) { NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i); conf.addNN(nnConf); } @@ -516,11 +572,15 @@ public class RouterDFSCluster { topology.setFederation(true); // Start mini DFS cluster - Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0)); + String ns0 = nameservices.get(0); + Configuration nnConf = generateNamenodeConfiguration(ns0); if (overrideConf != null) { nnConf.addResource(overrideConf); } - cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build(); + cluster = new MiniDFSCluster.Builder(nnConf) + .numDataNodes(nameservices.size()*2) + .nnTopology(topology) + .build(); cluster.waitActive(); // Store NN pointers @@ -530,28 +590,32 @@ public class RouterDFSCluster { } } catch (Exception e) { - LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e); - cluster.shutdown(); + LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e); + if (cluster != null) { + cluster.shutdown(); + } } } public void startRouters() throws InterruptedException, URISyntaxException, IOException { - // Create routers - routers = new ArrayList<RouterContext>(); - for (String ns : nameservices) { + // Create one router per nameservice + this.routers = new ArrayList<>(); + for (String ns : this.nameservices) { for (NamenodeContext context : getNamenodes(ns)) { - routers.add(buildRouter(ns, context.namenodeId)); + RouterContext router = buildRouter(ns, context.namenodeId); + this.routers.add(router); } } // Start all routers - for (RouterContext router : routers) { + for (RouterContext router : this.routers) { router.router.start(); } + // Wait until all routers are active and record their ports - for (RouterContext router : routers) { + for (RouterContext router : this.routers) { waitActive(router); router.initRouter(); } @@ -570,22 +634,21 @@ public class RouterDFSCluster { } Thread.sleep(1000); } - assertFalse( - "Timeout waiting for " + router.router.toString() + " to activate.", - true); + fail("Timeout waiting for " + router.router + " to activate"); } - public void registerNamenodes() throws IOException { - for (RouterContext r : routers) { + for (RouterContext r : this.routers) { ActiveNamenodeResolver resolver = r.router.getNamenodeResolver(); - for (NamenodeContext nn : namenodes) { + for (NamenodeContext nn : this.namenodes) { // Generate a report - NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId, - nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(), + NamenodeStatusReport report = new NamenodeStatusReport( + nn.nameserviceId, nn.namenodeId, + nn.getRpcAddress(), nn.getServiceAddress(), nn.getLifelineAddress(), nn.getHttpAddress()); - report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage() - .getStorage().getNamespaceInfo()); + FSImage fsImage = nn.namenode.getNamesystem().getFSImage(); + NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo(); + report.setNamespaceInfo(nsInfo); // Determine HA state from nn public state string String nnState = nn.namenode.getState(); @@ -606,74 +669,97 @@ public class RouterDFSCluster { public void waitNamenodeRegistration() throws InterruptedException, IllegalStateException, IOException { - for (RouterContext r : routers) { - for (NamenodeContext nn : namenodes) { - FederationTestUtils.waitNamenodeRegistered( - r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId, - null); + for (RouterContext r : this.routers) { + Router router = r.router; + for (NamenodeContext nn : this.namenodes) { + ActiveNamenodeResolver nnResolver = router.getNamenodeResolver(); + waitNamenodeRegistered( + nnResolver, nn.nameserviceId, nn.namenodeId, null); } } } public void waitRouterRegistrationQuorum(RouterContext router, - FederationNamenodeServiceState state, String nameservice, String namenode) + FederationNamenodeServiceState state, String nsId, String nnId) throws InterruptedException, IOException { - LOG.info("Waiting for NN - " + nameservice + ":" + namenode - + " to transition to state - " + state); - FederationTestUtils.waitNamenodeRegistered( - router.router.getNamenodeResolver(), nameservice, namenode, state); + LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state); + ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver(); + waitNamenodeRegistered(nnResolver, nsId, nnId, state); } - public String getFederatedPathForNameservice(String ns) { - return "/" + ns; + /** + * Get the federated path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Router. + */ + public String getFederatedPathForNS(String nsId) { + return "/" + nsId; } - public String getNamenodePathForNameservice(String ns) { - return "/target-" + ns; + /** + * Get the namenode path for a nameservice. + * @param nsId Nameservice identifier. + * @return Path in the Namenode. + */ + public String getNamenodePathForNS(String nsId) { + return "/target-" + nsId; } /** - * @return example: + * Get the federated test directory for a nameservice. + * @param nsId Nameservice identifier. + * @return Example: * <ul> * <li>/ns0/testdir which maps to ns0->/target-ns0/testdir * </ul> */ - public String getFederatedTestDirectoryForNameservice(String ns) { - return getFederatedPathForNameservice(ns) + "/" + TEST_DIR; + public String getFederatedTestDirectoryForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_DIR; } /** + * Get the namenode test directory for a nameservice. + * @param nsId Nameservice identifier. * @return example: * <ul> * <li>/target-ns0/testdir * </ul> */ - public String getNamenodeTestDirectoryForNameservice(String ns) { - return getNamenodePathForNameservice(ns) + "/" + TEST_DIR; + public String getNamenodeTestDirectoryForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_DIR; } /** + * Get the federated test file for a nameservice. + * @param nsId Nameservice identifier. * @return example: * <ul> * <li>/ns0/testfile which maps to ns0->/target-ns0/testfile * </ul> */ - public String getFederatedTestFileForNameservice(String ns) { - return getFederatedPathForNameservice(ns) + "/" + TEST_FILE; + public String getFederatedTestFileForNS(String nsId) { + return getFederatedPathForNS(nsId) + "/" + TEST_FILE; } /** + * Get the namenode test file for a nameservice. + * @param nsId Nameservice identifier. * @return example: * <ul> * <li>/target-ns0/testfile * </ul> */ - public String getNamenodeTestFileForNameservice(String ns) { - return getNamenodePathForNameservice(ns) + "/" + TEST_FILE; + public String getNamenodeTestFileForNS(String nsId) { + return getNamenodePathForNS(nsId) + "/" + TEST_FILE; } + /** + * Stop the federated HDFS cluster. + */ public void shutdown() { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } if (routers != null) { for (RouterContext context : routers) { stopRouter(context); @@ -681,9 +767,12 @@ public class RouterDFSCluster { } } + /** + * Stop a router. + * @param router Router context. + */ public void stopRouter(RouterContext router) { try { - router.router.shutDown(); int loopCount = 0; @@ -691,7 +780,7 @@ public class RouterDFSCluster { loopCount++; Thread.sleep(1000); if (loopCount > 20) { - LOG.error("Unable to shutdown router - " + router.rpcPort); + LOG.error("Cannot shutdown router {}", router.rpcPort); break; } } @@ -714,26 +803,28 @@ public class RouterDFSCluster { for (String ns : getNameservices()) { NamenodeContext context = getNamenode(ns, null); if (!createTestDirectoriesNamenode(context)) { - throw new IOException("Unable to create test directory for ns - " + ns); + throw new IOException("Cannot create test directory for ns " + ns); } } } public boolean createTestDirectoriesNamenode(NamenodeContext nn) throws IOException { - return FederationTestUtils.addDirectory(nn.getFileSystem(), - getNamenodeTestDirectoryForNameservice(nn.nameserviceId)); + FileSystem fs = nn.getFileSystem(); + String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId); + return addDirectory(fs, testDir); } public void deleteAllFiles() throws IOException { // Delete all files via the NNs and verify for (NamenodeContext context : getNamenodes()) { - FileStatus[] status = context.getFileSystem().listStatus(new Path("/")); - for(int i = 0; i <status.length; i++) { + FileSystem fs = context.getFileSystem(); + FileStatus[] status = fs.listStatus(new Path("/")); + for (int i = 0; i <status.length; i++) { Path p = status[i].getPath(); - context.getFileSystem().delete(p, true); + fs.delete(p, true); } - status = context.getFileSystem().listStatus(new Path("/")); + status = fs.listStatus(new Path("/")); assertEquals(status.length, 0); } } @@ -754,14 +845,34 @@ public class RouterDFSCluster { MockResolver resolver = (MockResolver) r.router.getSubclusterResolver(); // create table entries - for (String ns : nameservices) { + for (String nsId : nameservices) { // Direct path - resolver.addLocation(getFederatedPathForNameservice(ns), ns, - getNamenodePathForNameservice(ns)); + String routerPath = getFederatedPathForNS(nsId); + String nnPath = getNamenodePathForNS(nsId); + resolver.addLocation(routerPath, nsId, nnPath); } - // Root path goes to both NS1 - resolver.addLocation("/", nameservices.get(0), "/"); + // Root path points to both first nameservice + String ns0 = nameservices.get(0); + resolver.addLocation("/", ns0, "/"); + } + } + + public MiniDFSCluster getCluster() { + return cluster; + } + + /** + * Wait until the federated cluster is up and ready. + * @throws IOException If we cannot wait for the cluster to be up. + */ + public void waitClusterUp() throws IOException { + cluster.waitClusterUp(); + registerNamenodes(); + try { + waitNamenodeRegistration(); + } catch (Exception e) { + throw new IOException("Cannot wait for the namenodes", e); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index 8c720c7..d8afb39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.IOException; import java.net.URISyntaxException; @@ -51,6 +52,10 @@ public class TestRouter { conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, MockResolver.class.getCanonicalName()); + // Bind to any available port + conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + // Simulate a co-located NN conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); conf.set("fs.defaultFS", "hdfs://" + "ns0"); @@ -90,7 +95,31 @@ public class TestRouter { @Test public void testRouterService() throws InterruptedException, IOException { + // Rpc only + testRouterStartup(new RouterConfigBuilder(conf).rpc().build()); + // Run with all services - testRouterStartup((new RouterConfigBuilder(conf)).build()); + testRouterStartup(new RouterConfigBuilder(conf).all().build()); + } + + @Test + public void testRouterRestartRpcService() throws IOException { + + // Start + Router router = new Router(); + router.init(new RouterConfigBuilder(conf).rpc().build()); + router.start(); + + // Verify RPC server is running + assertNotNull(router.getRpcServerAddress()); + RouterRpcServer rpcServer = router.getRpcServer(); + assertNotNull(rpcServer); + assertEquals(STATE.STARTED, rpcServer.getServiceState()); + + // Stop router and RPC server + router.stop(); + assertEquals(STATE.STOPPED, rpcServer.getServiceState()); + + router.close(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java new file mode 100644 index 0000000..af506c9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -0,0 +1,869 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * The the RPC interface of the {@link Router} implemented by + * {@link RouterRpcServer}. + */ +public class TestRouterRpc { + + /** Federated HDFS cluster. */ + private static RouterDFSCluster cluster; + + /** Random Router for this federated cluster. */ + private RouterContext router; + + /** Random nameservice in the federated cluster. */ + private String ns; + /** First namenode in the nameservice. */ + private NamenodeContext namenode; + + /** Client interface to the Router. */ + private ClientProtocol routerProtocol; + /** Client interface to the Namenode. */ + private ClientProtocol nnProtocol; + + /** Filesystem interface to the Router. */ + private FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private FileSystem nnFS; + + /** File in the Router. */ + private String routerFile; + /** File in the Namenode. */ + private String nnFile; + + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new RouterDFSCluster(false, 2); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build()); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + + @Before + public void testSetup() throws Exception { + + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Pick a NS, namenode and router for this test + this.router = cluster.getRandomRouter(); + this.ns = cluster.getRandomNameservice(); + this.namenode = cluster.getNamenode(ns, null); + + // Handles to the ClientProtocol interface + this.routerProtocol = router.getClient().getNamenode(); + this.nnProtocol = namenode.getClient().getNamenode(); + + // Handles to the filesystem client + this.nnFS = namenode.getFileSystem(); + this.routerFS = router.getFileSystem(); + + // Create a test file on the NN + Random r = new Random(); + String randomFile = "testfile-" + r.nextInt(); + this.nnFile = + cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; + this.routerFile = + cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile; + + createFile(nnFS, nnFile, 32); + verifyFileExists(nnFS, nnFile); + } + + @Test + public void testRpcService() throws IOException { + Router testRouter = new Router(); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null); + RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter, + testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver()); + server.init(routerConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + testRouter.close(); + } + + protected RouterDFSCluster getCluster() { + return TestRouterRpc.cluster; + } + + protected RouterContext getRouterContext() { + return this.router; + } + + protected void setRouter(RouterContext r) + throws IOException, URISyntaxException { + this.router = r; + this.routerProtocol = r.getClient().getNamenode(); + this.routerFS = r.getFileSystem(); + } + + protected FileSystem getRouterFileSystem() { + return this.routerFS; + } + + protected FileSystem getNamenodeFileSystem() { + return this.nnFS; + } + + protected ClientProtocol getRouterProtocol() { + return this.routerProtocol; + } + + protected ClientProtocol getNamenodeProtocol() { + return this.nnProtocol; + } + + protected NamenodeContext getNamenode() { + return this.namenode; + } + + protected void setNamenodeFile(String filename) { + this.nnFile = filename; + } + + protected String getNamenodeFile() { + return this.nnFile; + } + + protected void setRouterFile(String filename) { + this.routerFile = filename; + } + + protected String getRouterFile() { + return this.routerFile; + } + + protected void setNamenode(NamenodeContext nn) + throws IOException, URISyntaxException { + this.namenode = nn; + this.nnProtocol = nn.getClient().getNamenode(); + this.nnFS = nn.getFileSystem(); + } + + protected String getNs() { + return this.ns; + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + protected static void compareResponses( + ClientProtocol protocol1, ClientProtocol protocol2, + Method m, Object[] paramList) { + + Object return1 = null; + Exception exception1 = null; + try { + return1 = m.invoke(protocol1, paramList); + } catch (Exception ex) { + exception1 = ex; + } + + Object return2 = null; + Exception exception2 = null; + try { + return2 = m.invoke(protocol2, paramList); + } catch (Exception ex) { + exception2 = ex; + } + + assertEquals(return1, return2); + if (exception1 == null && exception2 == null) { + return; + } + + assertEquals( + exception1.getCause().getClass(), + exception2.getCause().getClass()); + } + + @Test + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // all items in / of ns0 (default NS) + + // Collect the mount table entries from the root mount point + Set<String> requiredPaths = new TreeSet<>(); + FileSubclusterResolver fileResolver = + router.getRouter().getSubclusterResolver(); + for (String mount : fileResolver.getMountPoints("/")) { + requiredPaths.add(mount); + } + + // Collect all files/dirs on the root path of the default NS + String defaultNs = cluster.getNameservices().get(0); + NamenodeContext nn = cluster.getNamenode(defaultNs, null); + FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/")); + for (FileStatus file : iterator) { + requiredPaths.add(file.getPath().getName()); + } + + // Fetch listing + DirectoryListing listing = + routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false); + Iterator<String> requiredPathsIterator = requiredPaths.iterator(); + // Match each path returned and verify order returned + for(HdfsFileStatus f : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = f.getFullPath(new Path("/")).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals(requiredPaths.size(), listing.getPartialListing().length); + + // List a path that doesn't exist and validate error response with NN + // behavior. + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Test + public void testProxyListFilesWithConflict() + throws IOException, InterruptedException { + + // Add a directory to the namespace that conflicts with a mount point + NamenodeContext nn = cluster.getNamenode(ns, null); + FileSystem nnFs = nn.getFileSystem(); + addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns)); + + FileSystem routerFs = router.getFileSystem(); + int initialCount = countContents(routerFs, "/"); + + // Root file system now for NS X: + // / -> + // /ns0 (mount table) + // /ns1 (mount table) + // /target-ns0 (the target folder for the NS0 mapped to / + // /nsX (local directory that duplicates mount table) + int newCount = countContents(routerFs, "/"); + assertEquals(initialCount, newCount); + + // Verify that each root path is readable and contains one test directory + assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns))); + + // Verify that real folder for the ns contains a single test directory + assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns))); + + } + + protected void testRename(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(filename, renamedFile); + } catch (Exception ex) { + exceptionThrown = true; + } + if (exceptionExpected) { + // Error was expected + assertTrue(exceptionThrown); + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // No error was expected + assertFalse(exceptionThrown); + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + protected void testRename2(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {}); + } catch (Exception ex) { + exceptionThrown = true; + } + assertEquals(exceptionExpected, exceptionThrown); + if (exceptionExpected) { + // Error was expected + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + @Test + public void testProxyRenameFiles() throws IOException, InterruptedException { + + Thread.sleep(5000); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename within the same namespace + // /ns0/testdir/testrename -> /ns0/testdir/testrename-append + String filename = + cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + String renamedFile = filename + "-append"; + testRename(router, filename, renamedFile, false); + testRename2(router, filename, renamedFile, false); + + // Rename a file to a destination that is in a different namespace (fails) + filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename"; + testRename(router, filename, renamedFile, true); + testRename2(router, filename, renamedFile, true); + } + + @Test + public void testProxyChownFiles() throws Exception { + + String newUsername = "TestUser"; + String newGroup = "TestGroup"; + + // change owner + routerProtocol.setOwner(routerFile, newUsername, newGroup); + + // Verify with NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(file.getOwner(), newUsername); + assertEquals(file.getGroup(), newGroup); + + // Bad request and validate router response matches NN response. + Method m = ClientProtocol.class.getMethod("setOwner", String.class, + String.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, newUsername, newGroup}); + } + + @Test + public void testProxyGetStats() throws Exception { + + long[] combinedData = routerProtocol.getStats(); + + long[] individualData = new long[10]; + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + long[] data = clientProtocol.getStats(); + for (int i = 0; i < data.length; i++) { + individualData[i] += data[i]; + } + assert(data.length == combinedData.length); + } + + for (int i = 0; i < combinedData.length && i < individualData.length; i++) { + if (i == ClientProtocol.GET_STATS_REMAINING_IDX) { + // Skip available storage as this fluctuates in mini cluster + continue; + } + assertEquals(combinedData[i], individualData[i]); + } + } + + @Test + public void testProxyGetDatanodeReport() throws Exception { + + DatanodeInfo[] combinedData = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + + Set<Integer> individualData = new HashSet<Integer>(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeInfo[] data = + clientProtocol.getDatanodeReport(DatanodeReportType.ALL); + for (int i = 0; i < data.length; i++) { + // Collect unique DNs based on their xfer port + DatanodeInfo info = data[i]; + individualData.add(info.getXferPort()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyGetDatanodeStorageReport() + throws IOException, InterruptedException, URISyntaxException { + + DatanodeStorageReport[] combinedData = + routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + + Set<String> individualData = new HashSet<>(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeStorageReport[] data = + clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + for (DatanodeStorageReport report : data) { + // Determine unique DN instances + DatanodeInfo dn = report.getDatanodeInfo(); + individualData.add(dn.toString()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyMkdir() throws Exception { + + // Check the initial folders + FileStatus[] filesInitial = routerFS.listStatus(new Path("/")); + + // Create a directory via the router at the root level + String dirPath = "/testdir"; + FsPermission permission = new FsPermission("705"); + routerProtocol.mkdirs(dirPath, permission, false); + + // Verify the root listing has the item via the router + FileStatus[] files = routerFS.listStatus(new Path("/")); + assertEquals(Arrays.toString(files) + " should be " + + Arrays.toString(filesInitial) + " + " + dirPath, + filesInitial.length + 1, files.length); + assertTrue(verifyFileExists(routerFS, dirPath)); + + // Verify the directory is present in only 1 Namenode + int foundCount = 0; + for (NamenodeContext n : cluster.getNamenodes()) { + if (verifyFileExists(n.getFileSystem(), dirPath)) { + foundCount++; + } + } + assertEquals(1, foundCount); + assertTrue(deleteFile(routerFS, dirPath)); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("mkdirs", String.class, + FsPermission.class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission, false}); + } + + @Test + public void testProxyChmodFiles() throws Exception { + + FsPermission permission = new FsPermission("444"); + + // change permissions + routerProtocol.setPermission(routerFile, permission); + + // Validate permissions NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(permission, file.getPermission()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setPermission", String.class, FsPermission.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission}); + } + + @Test + public void testProxySetReplication() throws Exception { + + // Check current replication via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertEquals(1, file.getReplication()); + + // increment replication via router + routerProtocol.setReplication(routerFile, (short) 2); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(2, file.getReplication()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setReplication", String.class, short.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (short) 2}); + } + + @Test + public void testProxyTruncateFile() throws Exception { + + // Check file size via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertTrue(file.getLen() > 0); + + // Truncate to 0 bytes via router + routerProtocol.truncate(routerFile, 0, "testclient"); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(0, file.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "truncate", String.class, long.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (long) 0, "testclient"}); + } + + @Test + public void testProxyGetBlockLocations() throws Exception { + + // Fetch block locations via router + LocatedBlocks locations = + routerProtocol.getBlockLocations(routerFile, 0, 1024); + assertEquals(1, locations.getLocatedBlocks().size()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getBlockLocations", String.class, long.class, long.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, (long) 0, (long) 0}); + } + + @Test + public void testProxyStoragePolicy() throws Exception { + + // Query initial policy via NN + HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile); + + // Set a random policy via router + BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies(); + BlockStoragePolicy policy = policies[0]; + + while (policy.isCopyOnCreateFile()) { + // Pick a non copy on create policy + Random rand = new Random(); + int randIndex = rand.nextInt(policies.length); + policy = policies[randIndex]; + } + routerProtocol.setStoragePolicy(routerFile, policy.getName()); + + // Verify policy via NN + HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile); + assertTrue(newStatus.getStoragePolicy() == policy.getId()); + assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class, + String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, "badpolicy"}); + } + + @Test + public void testProxyGetPreferedBlockSize() throws Exception { + + // Query via NN and Router and verify + long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile); + long routerSize = routerProtocol.getPreferredBlockSize(routerFile); + assertEquals(routerSize, namenodeSize); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getPreferredBlockSize", String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses( + routerProtocol, nnProtocol, m, new Object[] {badPath}); + } + + private void testConcat( + String source, String target, boolean failureExpected) { + boolean failure = false; + try { + // Concat test file with fill block length file via router + routerProtocol.concat(target, new String[] {source}); + } catch (IOException ex) { + failure = true; + } + assertEquals(failureExpected, failure); + } + + @Test + public void testProxyConcatFile() throws Exception { + + // Create a stub file in the primary ns + String sameNameservice = ns; + String existingFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_concatfile"; + int existingFileSize = 32; + createFile(routerFS, existingFile, existingFileSize); + + // Identify an alternate nameservice that doesn't match the existing file + String alternateNameservice = null; + for (String n : cluster.getNameservices()) { + if (!n.equals(sameNameservice)) { + alternateNameservice = n; + break; + } + } + + // Create new files, must be a full block to use concat. One file is in the + // same namespace as the target file, the other is in a different namespace. + String altRouterFile = + cluster.getFederatedTestDirectoryForNS(alternateNameservice) + + "_newfile"; + String sameRouterFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_newfile"; + createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + + // Concat in different namespaces, fails + testConcat(existingFile, altRouterFile, true); + + // Concat in same namespaces, succeeds + testConcat(existingFile, sameRouterFile, false); + + // Check target file length + FileStatus status = getFileStatus(routerFS, sameRouterFile); + assertEquals( + existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, + status.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "concat", String.class, String[].class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, new String[] {routerFile}}); + } + + @Test + public void testProxyAppend() throws Exception { + + // Append a test string via router + EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND); + DFSClient routerClient = getRouterContext().getClient(); + HdfsDataOutputStream stream = + routerClient.append(routerFile, 1024, createFlag, null, null); + stream.writeBytes(TEST_STRING); + stream.close(); + + // Verify file size via NN + FileStatus status = getFileStatus(nnFS, nnFile); + assertTrue(status.getLen() > TEST_STRING.length()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("append", String.class, + String.class, EnumSetWritable.class); + String badPath = "/unknownlocation/unknowndir"; + EnumSetWritable<CreateFlag> createFlagWritable = + new EnumSetWritable<CreateFlag>(createFlag); + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, "testClient", createFlagWritable}); + } + + @Test + public void testProxyGetAdditionalDatanode() + throws IOException, InterruptedException, URISyntaxException { + + // Use primitive APIs to open a file, add a block, and get datanode location + EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE); + String clientName = getRouterContext().getClient().getClientName(); + String newRouterFile = routerFile + "_additionalDatanode"; + HdfsFileStatus status = routerProtocol.create( + newRouterFile, new FsPermission("777"), clientName, + new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1, + (long) 1024, CryptoProtocolVersion.supported(), null); + + // Add a block via router (requires client to have same lease) + LocatedBlock block = routerProtocol.addBlock( + newRouterFile, clientName, null, null, + status.getFileId(), null, null); + + DatanodeInfo[] exclusions = new DatanodeInfo[0]; + LocatedBlock newBlock = routerProtocol.getAdditionalDatanode( + newRouterFile, status.getFileId(), block.getBlock(), + block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName); + assertNotNull(newBlock); + } + + @Test + public void testProxyCreateFileAlternateUser() + throws IOException, URISyntaxException, InterruptedException { + + // Create via Router + String routerDir = cluster.getFederatedTestDirectoryForNS(ns); + String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns); + String newRouterFile = routerDir + "/unknownuser"; + String newNamenodeFile = namenodeDir + "/unknownuser"; + String username = "unknownuser"; + + // Allow all user access to dir + namenode.getFileContext().setPermission( + new Path(namenodeDir), new FsPermission("777")); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username); + DFSClient client = getRouterContext().getClient(ugi); + client.create(newRouterFile, true); + + // Fetch via NN and check user + FileStatus status = getFileStatus(nnFS, newNamenodeFile); + assertEquals(status.getOwner(), username); + } + + @Test + public void testProxyGetFileInfoAcessException() throws IOException { + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser("unknownuser"); + + // List files from the NN and trap the exception + Exception nnFailure = null; + try { + String testFile = cluster.getNamenodeTestFileForNS(ns); + namenode.getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + nnFailure = e; + } + assertNotNull(nnFailure); + + // List files from the router and trap the exception + Exception routerFailure = null; + try { + String testFile = cluster.getFederatedTestFileForNS(ns); + getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + routerFailure = e; + } + assertNotNull(routerFailure); + + assertEquals(routerFailure.getClass(), nnFailure.getClass()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/43a1a5fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java new file mode 100644 index 0000000..5489691 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; + +/** + * The the RPC interface of the {@link getRouter()} implemented by + * {@link RouterRpcServer}. + */ +public class TestRouterRpcMultiDestination extends TestRouterRpc { + + @Override + public void testSetup() throws Exception { + + RouterDFSCluster cluster = getCluster(); + + // Create mock locations + getCluster().installMockLocations(); + List<RouterContext> routers = cluster.getRouters(); + + // Add extra location to the root mount / such that the root mount points: + // / + // ns0 -> / + // ns1 -> / + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + resolver.addLocation("/", cluster.getNameservices().get(1), "/"); + } + + // Create a mount that points to 2 dirs in the same ns: + // /same + // ns0 -> / + // ns0 -> /target-ns0 + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + resolver.addLocation("/same", ns0, "/"); + resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0)); + } + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Pick a NS, namenode and getRouter() for this test + RouterContext router = cluster.getRandomRouter(); + this.setRouter(router); + + String ns = cluster.getRandomNameservice(); + this.setNs(ns); + this.setNamenode(cluster.getNamenode(ns, null)); + + // Create a test file on a single NN that is accessed via a getRouter() path + // with 2 destinations. All tests should failover to the alternate + // destination if the wrong NN is attempted first. + Random r = new Random(); + String randomString = "testfile-" + r.nextInt(); + setNamenodeFile("/" + randomString); + setRouterFile("/" + randomString); + + FileSystem nnFs = getNamenodeFileSystem(); + FileSystem routerFs = getRouterFileSystem(); + createFile(nnFs, getNamenodeFile(), 32); + + verifyFileExists(nnFs, getNamenodeFile()); + verifyFileExists(routerFs, getRouterFile()); + } + + private void testListing(String path) throws IOException { + + // Collect the mount table entries for this path + Set<String> requiredPaths = new TreeSet<>(); + RouterContext rc = getRouterContext(); + Router router = rc.getRouter(); + FileSubclusterResolver subclusterResolver = router.getSubclusterResolver(); + for (String mount : subclusterResolver.getMountPoints(path)) { + requiredPaths.add(mount); + } + + // Get files/dirs from the Namenodes + PathLocation location = subclusterResolver.getDestinationForPath(path); + for (RemoteLocation loc : location.getDestinations()) { + String nsId = loc.getNameserviceId(); + String dest = loc.getDest(); + NamenodeContext nn = getCluster().getNamenode(nsId, null); + FileSystem fs = nn.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path(dest)); + for (FileStatus file : files) { + String pathName = file.getPath().getName(); + requiredPaths.add(pathName); + } + } + + // Get files/dirs from the Router + DirectoryListing listing = + getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false); + Iterator<String> requiredPathsIterator = requiredPaths.iterator(); + + // Match each path returned and verify order returned + HdfsFileStatus[] partialListing = listing.getPartialListing(); + for (HdfsFileStatus fileStatus : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = fileStatus.getFullPath(new Path(path)).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals( + requiredPaths + " doesn't match " + Arrays.toString(partialListing), + requiredPaths.size(), partialListing.length); + } + + @Override + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // /same (from the mount table) + // all items in / of ns0 from mapping of / -> ns0:::/) + // all items in / of ns1 from mapping of / -> ns1:::/) + testListing("/"); + + // Verify that the "/same" mount point lists the contents of both dirs in + // the same ns + // /same --> + // /target-ns0 (from root of ns0) + // /testdir (from contents of /target-ns0) + testListing("/same"); + + // List a non-existing path and validate error response with NN behavior + ClientProtocol namenodeProtocol = + getCluster().getRandomNamenode().getClient().getNamenode(); + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(getRouterProtocol(), namenodeProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Override + public void testProxyRenameFiles() throws IOException, InterruptedException { + + super.testProxyRenameFiles(); + + List<String> nss = getCluster().getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename a file from ns0 into the root (mapped to both ns0 and ns1) + String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0); + String filename0 = testDir0 + "/testrename"; + String renamedFile = "/testrename"; + testRename(getRouterContext(), filename0, renamedFile, false); + testRename2(getRouterContext(), filename0, renamedFile, false); + + // Rename a file from ns1 into the root (mapped to both ns0 and ns1) + String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1); + String filename1 = testDir1 + "/testrename"; + testRename(getRouterContext(), filename1, renamedFile, false); + testRename2(getRouterContext(), filename1, renamedFile, false); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org