http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java new file mode 100644 index 0000000..2b12a25 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -0,0 +1,1072 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; +import com.google.common.collect.Maps; + +/** + * The the RPC interface of the {@link Router} implemented by + * {@link RouterRpcServer}. + */ +public class TestRouterRpc { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterRpc.class); + + private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP = + new Comparator<ErasureCodingPolicyInfo>() { + public int compare( + ErasureCodingPolicyInfo ec0, + ErasureCodingPolicyInfo ec1) { + String name0 = ec0.getPolicy().getName(); + String name1 = ec1.getPolicy().getName(); + return name0.compareTo(name1); + } + }; + + /** Federated HDFS cluster. */ + private static RouterDFSCluster cluster; + + /** Random Router for this federated cluster. */ + private RouterContext router; + + /** Random nameservice in the federated cluster. */ + private String ns; + /** First namenode in the nameservice. */ + private NamenodeContext namenode; + + /** Client interface to the Router. */ + private ClientProtocol routerProtocol; + /** Client interface to the Namenode. */ + private ClientProtocol nnProtocol; + + /** Filesystem interface to the Router. */ + private FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private FileSystem nnFS; + + /** File in the Router. */ + private String routerFile; + /** File in the Namenode. */ + private String nnFile; + + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new RouterDFSCluster(false, 2); + // We need 6 DNs to test Erasure Coding with RS-6-3-64k + cluster.setNumDatanodesPerNameservice(6); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build()); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + + @Before + public void testSetup() throws Exception { + + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Default namenode and random router for this test + this.router = cluster.getRandomRouter(); + this.ns = cluster.getNameservices().get(0); + this.namenode = cluster.getNamenode(ns, null); + + // Handles to the ClientProtocol interface + this.routerProtocol = router.getClient().getNamenode(); + this.nnProtocol = namenode.getClient().getNamenode(); + + // Handles to the filesystem client + this.nnFS = namenode.getFileSystem(); + this.routerFS = router.getFileSystem(); + + // Create a test file on the NN + Random r = new Random(); + String randomFile = "testfile-" + r.nextInt(); + this.nnFile = + cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; + this.routerFile = + cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile; + + createFile(nnFS, nnFile, 32); + verifyFileExists(nnFS, nnFile); + } + + @Test + public void testRpcService() throws IOException { + Router testRouter = new Router(); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null); + RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter, + testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver()); + server.init(routerConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + testRouter.close(); + } + + protected RouterDFSCluster getCluster() { + return TestRouterRpc.cluster; + } + + protected RouterContext getRouterContext() { + return this.router; + } + + protected void setRouter(RouterContext r) + throws IOException, URISyntaxException { + this.router = r; + this.routerProtocol = r.getClient().getNamenode(); + this.routerFS = r.getFileSystem(); + } + + protected FileSystem getRouterFileSystem() { + return this.routerFS; + } + + protected FileSystem getNamenodeFileSystem() { + return this.nnFS; + } + + protected ClientProtocol getRouterProtocol() { + return this.routerProtocol; + } + + protected ClientProtocol getNamenodeProtocol() { + return this.nnProtocol; + } + + protected NamenodeContext getNamenode() { + return this.namenode; + } + + protected void setNamenodeFile(String filename) { + this.nnFile = filename; + } + + protected String getNamenodeFile() { + return this.nnFile; + } + + protected void setRouterFile(String filename) { + this.routerFile = filename; + } + + protected String getRouterFile() { + return this.routerFile; + } + + protected void setNamenode(NamenodeContext nn) + throws IOException, URISyntaxException { + this.namenode = nn; + this.nnProtocol = nn.getClient().getNamenode(); + this.nnFS = nn.getFileSystem(); + } + + protected String getNs() { + return this.ns; + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + protected static void compareResponses( + ClientProtocol protocol1, ClientProtocol protocol2, + Method m, Object[] paramList) { + + Object return1 = null; + Exception exception1 = null; + try { + return1 = m.invoke(protocol1, paramList); + } catch (Exception ex) { + exception1 = ex; + } + + Object return2 = null; + Exception exception2 = null; + try { + return2 = m.invoke(protocol2, paramList); + } catch (Exception ex) { + exception2 = ex; + } + + assertEquals(return1, return2); + if (exception1 == null && exception2 == null) { + return; + } + + assertEquals( + exception1.getCause().getClass(), + exception2.getCause().getClass()); + } + + @Test + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // all items in / of ns0 (default NS) + + // Collect the mount table entries from the root mount point + Set<String> requiredPaths = new TreeSet<>(); + FileSubclusterResolver fileResolver = + router.getRouter().getSubclusterResolver(); + for (String mount : fileResolver.getMountPoints("/")) { + requiredPaths.add(mount); + } + + // Collect all files/dirs on the root path of the default NS + String defaultNs = cluster.getNameservices().get(0); + NamenodeContext nn = cluster.getNamenode(defaultNs, null); + FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/")); + for (FileStatus file : iterator) { + requiredPaths.add(file.getPath().getName()); + } + + // Fetch listing + DirectoryListing listing = + routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false); + Iterator<String> requiredPathsIterator = requiredPaths.iterator(); + // Match each path returned and verify order returned + for(HdfsFileStatus f : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = f.getFullPath(new Path("/")).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals(requiredPaths.size(), listing.getPartialListing().length); + + // List a path that doesn't exist and validate error response with NN + // behavior. + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Test + public void testProxyListFilesWithConflict() + throws IOException, InterruptedException { + + // Add a directory to the namespace that conflicts with a mount point + NamenodeContext nn = cluster.getNamenode(ns, null); + FileSystem nnFs = nn.getFileSystem(); + addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns)); + + FileSystem routerFs = router.getFileSystem(); + int initialCount = countContents(routerFs, "/"); + + // Root file system now for NS X: + // / -> + // /ns0 (mount table) + // /ns1 (mount table) + // /target-ns0 (the target folder for the NS0 mapped to / + // /nsX (local directory that duplicates mount table) + int newCount = countContents(routerFs, "/"); + assertEquals(initialCount, newCount); + + // Verify that each root path is readable and contains one test directory + assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns))); + + // Verify that real folder for the ns contains a single test directory + assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns))); + + } + + protected void testRename(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename(filename, renamedFile); + } catch (Exception ex) { + exceptionThrown = true; + } + if (exceptionExpected) { + // Error was expected + assertTrue(exceptionThrown); + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // No error was expected + assertFalse(exceptionThrown); + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + protected void testRename2(RouterContext testRouter, String filename, + String renamedFile, boolean exceptionExpected) throws IOException { + createFile(testRouter.getFileSystem(), filename, 32); + // verify + verifyFileExists(testRouter.getFileSystem(), filename); + // rename + boolean exceptionThrown = false; + try { + DFSClient client = testRouter.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {}); + } catch (Exception ex) { + exceptionThrown = true; + } + assertEquals(exceptionExpected, exceptionThrown); + if (exceptionExpected) { + // Error was expected + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(filename), true)); + } else { + // verify + assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile)); + // delete + FileContext fileContext = testRouter.getFileContext(); + assertTrue(fileContext.delete(new Path(renamedFile), true)); + } + } + + @Test + public void testProxyRenameFiles() throws IOException, InterruptedException { + + Thread.sleep(5000); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename within the same namespace + // /ns0/testdir/testrename -> /ns0/testdir/testrename-append + String filename = + cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + String renamedFile = filename + "-append"; + testRename(router, filename, renamedFile, false); + testRename2(router, filename, renamedFile, false); + + // Rename a file to a destination that is in a different namespace (fails) + filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename"; + renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename"; + testRename(router, filename, renamedFile, true); + testRename2(router, filename, renamedFile, true); + } + + @Test + public void testProxyChownFiles() throws Exception { + + String newUsername = "TestUser"; + String newGroup = "TestGroup"; + + // change owner + routerProtocol.setOwner(routerFile, newUsername, newGroup); + + // Verify with NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(file.getOwner(), newUsername); + assertEquals(file.getGroup(), newGroup); + + // Bad request and validate router response matches NN response. + Method m = ClientProtocol.class.getMethod("setOwner", String.class, + String.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, newUsername, newGroup}); + } + + @Test + public void testProxyGetStats() throws Exception { + // Some of the statistics are out of sync because of the mini cluster + Supplier<Boolean> check = new Supplier<Boolean>() { + @Override + public Boolean get() { + try { + long[] combinedData = routerProtocol.getStats(); + long[] individualData = getAggregateStats(); + int len = Math.min(combinedData.length, individualData.length); + for (int i = 0; i < len; i++) { + if (combinedData[i] != individualData[i]) { + LOG.error("Stats for {} don't match: {} != {}", + i, combinedData[i], individualData[i]); + return false; + } + } + return true; + } catch (Exception e) { + LOG.error("Cannot get stats: {}", e.getMessage()); + return false; + } + } + }; + GenericTestUtils.waitFor(check, 500, 5 * 1000); + } + + /** + * Get the sum of each subcluster statistics. + * @return Aggregated statistics. + * @throws Exception If it cannot get the stats from the Router or Namenode. + */ + private long[] getAggregateStats() throws Exception { + long[] individualData = new long[10]; + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + long[] data = clientProtocol.getStats(); + for (int i = 0; i < data.length; i++) { + individualData[i] += data[i]; + } + } + return individualData; + } + + @Test + public void testProxyGetDatanodeReport() throws Exception { + + DatanodeInfo[] combinedData = + routerProtocol.getDatanodeReport(DatanodeReportType.ALL); + + Set<Integer> individualData = new HashSet<Integer>(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeInfo[] data = + clientProtocol.getDatanodeReport(DatanodeReportType.ALL); + for (int i = 0; i < data.length; i++) { + // Collect unique DNs based on their xfer port + DatanodeInfo info = data[i]; + individualData.add(info.getXferPort()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyGetDatanodeStorageReport() + throws IOException, InterruptedException, URISyntaxException { + + DatanodeStorageReport[] combinedData = + routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + + Set<String> individualData = new HashSet<>(); + for (String nameservice : cluster.getNameservices()) { + NamenodeContext n = cluster.getNamenode(nameservice, null); + DFSClient client = n.getClient(); + ClientProtocol clientProtocol = client.getNamenode(); + DatanodeStorageReport[] data = + clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL); + for (DatanodeStorageReport report : data) { + // Determine unique DN instances + DatanodeInfo dn = report.getDatanodeInfo(); + individualData.add(dn.toString()); + } + } + assertEquals(combinedData.length, individualData.size()); + } + + @Test + public void testProxyMkdir() throws Exception { + + // Check the initial folders + FileStatus[] filesInitial = routerFS.listStatus(new Path("/")); + + // Create a directory via the router at the root level + String dirPath = "/testdir"; + FsPermission permission = new FsPermission("705"); + routerProtocol.mkdirs(dirPath, permission, false); + + // Verify the root listing has the item via the router + FileStatus[] files = routerFS.listStatus(new Path("/")); + assertEquals(Arrays.toString(files) + " should be " + + Arrays.toString(filesInitial) + " + " + dirPath, + filesInitial.length + 1, files.length); + assertTrue(verifyFileExists(routerFS, dirPath)); + + // Verify the directory is present in only 1 Namenode + int foundCount = 0; + for (NamenodeContext n : cluster.getNamenodes()) { + if (verifyFileExists(n.getFileSystem(), dirPath)) { + foundCount++; + } + } + assertEquals(1, foundCount); + assertTrue(deleteFile(routerFS, dirPath)); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("mkdirs", String.class, + FsPermission.class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission, false}); + } + + @Test + public void testProxyChmodFiles() throws Exception { + + FsPermission permission = new FsPermission("444"); + + // change permissions + routerProtocol.setPermission(routerFile, permission); + + // Validate permissions NN + FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile); + assertEquals(permission, file.getPermission()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setPermission", String.class, FsPermission.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, permission}); + } + + @Test + public void testProxySetReplication() throws Exception { + + // Check current replication via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertEquals(1, file.getReplication()); + + // increment replication via router + routerProtocol.setReplication(routerFile, (short) 2); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(2, file.getReplication()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "setReplication", String.class, short.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (short) 2}); + } + + @Test + public void testProxyTruncateFile() throws Exception { + + // Check file size via NN + FileStatus file = getFileStatus(nnFS, nnFile); + assertTrue(file.getLen() > 0); + + // Truncate to 0 bytes via router + routerProtocol.truncate(routerFile, 0, "testclient"); + + // Verify via NN + file = getFileStatus(nnFS, nnFile); + assertEquals(0, file.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "truncate", String.class, long.class, String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, (long) 0, "testclient"}); + } + + @Test + public void testProxyGetBlockLocations() throws Exception { + + // Fetch block locations via router + LocatedBlocks locations = + routerProtocol.getBlockLocations(routerFile, 0, 1024); + assertEquals(1, locations.getLocatedBlocks().size()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getBlockLocations", String.class, long.class, long.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, (long) 0, (long) 0}); + } + + @Test + public void testProxyStoragePolicy() throws Exception { + + // Query initial policy via NN + HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile); + + // Set a random policy via router + BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies(); + BlockStoragePolicy policy = policies[0]; + + while (policy.isCopyOnCreateFile()) { + // Pick a non copy on create policy + Random rand = new Random(); + int randIndex = rand.nextInt(policies.length); + policy = policies[randIndex]; + } + routerProtocol.setStoragePolicy(routerFile, policy.getName()); + + // Verify policy via NN + HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile); + assertTrue(newStatus.getStoragePolicy() == policy.getId()); + assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class, + String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, + m, new Object[] {badPath, "badpolicy"}); + } + + @Test + public void testProxyGetPreferedBlockSize() throws Exception { + + // Query via NN and Router and verify + long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile); + long routerSize = routerProtocol.getPreferredBlockSize(routerFile); + assertEquals(routerSize, namenodeSize); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "getPreferredBlockSize", String.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses( + routerProtocol, nnProtocol, m, new Object[] {badPath}); + } + + private void testConcat( + String source, String target, boolean failureExpected) { + boolean failure = false; + try { + // Concat test file with fill block length file via router + routerProtocol.concat(target, new String[] {source}); + } catch (IOException ex) { + failure = true; + } + assertEquals(failureExpected, failure); + } + + @Test + public void testProxyConcatFile() throws Exception { + + // Create a stub file in the primary ns + String sameNameservice = ns; + String existingFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_concatfile"; + int existingFileSize = 32; + createFile(routerFS, existingFile, existingFileSize); + + // Identify an alternate nameservice that doesn't match the existing file + String alternateNameservice = null; + for (String n : cluster.getNameservices()) { + if (!n.equals(sameNameservice)) { + alternateNameservice = n; + break; + } + } + + // Create new files, must be a full block to use concat. One file is in the + // same namespace as the target file, the other is in a different namespace. + String altRouterFile = + cluster.getFederatedTestDirectoryForNS(alternateNameservice) + + "_newfile"; + String sameRouterFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_newfile"; + createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + + // Concat in different namespaces, fails + testConcat(existingFile, altRouterFile, true); + + // Concat in same namespaces, succeeds + testConcat(existingFile, sameRouterFile, false); + + // Check target file length + FileStatus status = getFileStatus(routerFS, sameRouterFile); + assertEquals( + existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, + status.getLen()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod( + "concat", String.class, String[].class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, new String[] {routerFile}}); + } + + @Test + public void testProxyAppend() throws Exception { + + // Append a test string via router + EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND); + DFSClient routerClient = getRouterContext().getClient(); + HdfsDataOutputStream stream = + routerClient.append(routerFile, 1024, createFlag, null, null); + stream.writeBytes(TEST_STRING); + stream.close(); + + // Verify file size via NN + FileStatus status = getFileStatus(nnFS, nnFile); + assertTrue(status.getLen() > TEST_STRING.length()); + + // Validate router failure response matches NN failure response. + Method m = ClientProtocol.class.getMethod("append", String.class, + String.class, EnumSetWritable.class); + String badPath = "/unknownlocation/unknowndir"; + EnumSetWritable<CreateFlag> createFlagWritable = + new EnumSetWritable<CreateFlag>(createFlag); + compareResponses(routerProtocol, nnProtocol, m, + new Object[] {badPath, "testClient", createFlagWritable}); + } + + @Test + public void testProxyGetAdditionalDatanode() + throws IOException, InterruptedException, URISyntaxException { + + // Use primitive APIs to open a file, add a block, and get datanode location + EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE); + String clientName = getRouterContext().getClient().getClientName(); + String newRouterFile = routerFile + "_additionalDatanode"; + HdfsFileStatus status = routerProtocol.create( + newRouterFile, new FsPermission("777"), clientName, + new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1, + (long) 1024, CryptoProtocolVersion.supported(), null); + + // Add a block via router (requires client to have same lease) + LocatedBlock block = routerProtocol.addBlock( + newRouterFile, clientName, null, null, + status.getFileId(), null, null); + + DatanodeInfo[] exclusions = new DatanodeInfo[0]; + LocatedBlock newBlock = routerProtocol.getAdditionalDatanode( + newRouterFile, status.getFileId(), block.getBlock(), + block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName); + assertNotNull(newBlock); + } + + @Test + public void testProxyCreateFileAlternateUser() + throws IOException, URISyntaxException, InterruptedException { + + // Create via Router + String routerDir = cluster.getFederatedTestDirectoryForNS(ns); + String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns); + String newRouterFile = routerDir + "/unknownuser"; + String newNamenodeFile = namenodeDir + "/unknownuser"; + String username = "unknownuser"; + + // Allow all user access to dir + namenode.getFileContext().setPermission( + new Path(namenodeDir), new FsPermission("777")); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username); + DFSClient client = getRouterContext().getClient(ugi); + client.create(newRouterFile, true); + + // Fetch via NN and check user + FileStatus status = getFileStatus(nnFS, newNamenodeFile); + assertEquals(status.getOwner(), username); + } + + @Test + public void testProxyGetFileInfoAcessException() throws IOException { + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser("unknownuser"); + + // List files from the NN and trap the exception + Exception nnFailure = null; + try { + String testFile = cluster.getNamenodeTestFileForNS(ns); + namenode.getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + nnFailure = e; + } + assertNotNull(nnFailure); + + // List files from the router and trap the exception + Exception routerFailure = null; + try { + String testFile = cluster.getFederatedTestFileForNS(ns); + getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0); + } catch (Exception e) { + routerFailure = e; + } + assertNotNull(routerFailure); + + assertEquals(routerFailure.getClass(), nnFailure.getClass()); + } + + @Test + public void testErasureCoding() throws IOException { + + LOG.info("List the available erasurce coding policies"); + ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies(); + for (ErasureCodingPolicyInfo policy : policies) { + LOG.info(" {}", policy); + } + + LOG.info("List the erasure coding codecs"); + Map<String, String> codecsRouter = routerProtocol.getErasureCodingCodecs(); + Map<String, String> codecsNamenode = nnProtocol.getErasureCodingCodecs(); + assertTrue(Maps.difference(codecsRouter, codecsNamenode).areEqual()); + for (Entry<String, String> entry : codecsRouter.entrySet()) { + LOG.info(" {}: {}", entry.getKey(), entry.getValue()); + } + + LOG.info("Create a testing directory via the router at the root level"); + String dirPath = "/testec"; + String filePath1 = dirPath + "/testfile1"; + FsPermission permission = new FsPermission("755"); + routerProtocol.mkdirs(dirPath, permission, false); + createFile(routerFS, filePath1, 32); + assertTrue(verifyFileExists(routerFS, filePath1)); + DFSClient file1Protocol = getFileDFSClient(filePath1); + + LOG.info("The policy for the new file should not be set"); + assertNull(routerProtocol.getErasureCodingPolicy(filePath1)); + assertNull(file1Protocol.getErasureCodingPolicy(filePath1)); + + String policyName = "RS-6-3-1024k"; + LOG.info("Set policy \"{}\" for \"{}\"", policyName, dirPath); + routerProtocol.setErasureCodingPolicy(dirPath, policyName); + + String filePath2 = dirPath + "/testfile2"; + LOG.info("Create {} in the path with the new EC policy", filePath2); + createFile(routerFS, filePath2, 32); + assertTrue(verifyFileExists(routerFS, filePath2)); + DFSClient file2Protocol = getFileDFSClient(filePath2); + + LOG.info("Check that the policy is set for {}", filePath2); + ErasureCodingPolicy policyRouter1 = + routerProtocol.getErasureCodingPolicy(filePath2); + ErasureCodingPolicy policyNamenode1 = + file2Protocol.getErasureCodingPolicy(filePath2); + assertNotNull(policyRouter1); + assertEquals(policyName, policyRouter1.getName()); + assertEquals(policyName, policyNamenode1.getName()); + + LOG.info("Create a new erasure coding policy"); + String newPolicyName = "RS-6-3-128k"; + ECSchema ecSchema = new ECSchema(ErasureCodeConstants.RS_CODEC_NAME, 6, 3); + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy( + newPolicyName, + ecSchema, + 128 * 1024, + (byte) -1); + ErasureCodingPolicy[] newPolicies = new ErasureCodingPolicy[] { + ecPolicy + }; + AddErasureCodingPolicyResponse[] responses = + routerProtocol.addErasureCodingPolicies(newPolicies); + assertEquals(1, responses.length); + assertTrue(responses[0].isSucceed()); + routerProtocol.disableErasureCodingPolicy(newPolicyName); + + LOG.info("The new policy should be there and disabled"); + policies = checkErasureCodingPolicies(); + boolean found = false; + for (ErasureCodingPolicyInfo policy : policies) { + LOG.info(" {}" + policy); + if (policy.getPolicy().getName().equals(newPolicyName)) { + found = true; + assertEquals(ErasureCodingPolicyState.DISABLED, policy.getState()); + break; + } + } + assertTrue(found); + + LOG.info("Set the test folder to use the new policy"); + routerProtocol.enableErasureCodingPolicy(newPolicyName); + routerProtocol.setErasureCodingPolicy(dirPath, newPolicyName); + + LOG.info("Create a file in the path with the new EC policy"); + String filePath3 = dirPath + "/testfile3"; + createFile(routerFS, filePath3, 32); + assertTrue(verifyFileExists(routerFS, filePath3)); + DFSClient file3Protocol = getFileDFSClient(filePath3); + + ErasureCodingPolicy policyRouterFile3 = + routerProtocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyRouterFile3.getName()); + ErasureCodingPolicy policyNamenodeFile3 = + file3Protocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyNamenodeFile3.getName()); + + LOG.info("Remove the policy and check the one for the test folder"); + routerProtocol.removeErasureCodingPolicy(newPolicyName); + ErasureCodingPolicy policyRouter3 = + routerProtocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyRouter3.getName()); + ErasureCodingPolicy policyNamenode3 = + file3Protocol.getErasureCodingPolicy(filePath3); + assertEquals(newPolicyName, policyNamenode3.getName()); + + LOG.info("Check the stats"); + ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats(); + ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats(); + assertEquals(statsNamenode.toString(), statsRouter.toString()); + } + + /** + * Check the erasure coding policies in the Router and the Namenode. + * @return The erasure coding policies. + */ + private ErasureCodingPolicyInfo[] checkErasureCodingPolicies() + throws IOException { + ErasureCodingPolicyInfo[] policiesRouter = + routerProtocol.getErasureCodingPolicies(); + assertNotNull(policiesRouter); + ErasureCodingPolicyInfo[] policiesNamenode = + nnProtocol.getErasureCodingPolicies(); + Arrays.sort(policiesRouter, EC_POLICY_CMP); + Arrays.sort(policiesNamenode, EC_POLICY_CMP); + assertArrayEquals(policiesRouter, policiesNamenode); + return policiesRouter; + } + + /** + * Find the Namenode for a particular file and return the DFSClient. + * @param path Path of the file to check. + * @return The DFSClient to the Namenode holding the file. + */ + private DFSClient getFileDFSClient(final String path) { + for (String nsId : cluster.getNameservices()) { + LOG.info("Checking {} for {}", nsId, path); + NamenodeContext nn = cluster.getNamenode(nsId, null); + try { + DFSClient nnClientProtocol = nn.getClient(); + if (nnClientProtocol.getFileInfo(path) != null) { + return nnClientProtocol; + } + } catch (Exception ignore) { + // ignore + } + } + return null; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java new file mode 100644 index 0000000..5489691 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; + +/** + * The the RPC interface of the {@link getRouter()} implemented by + * {@link RouterRpcServer}. + */ +public class TestRouterRpcMultiDestination extends TestRouterRpc { + + @Override + public void testSetup() throws Exception { + + RouterDFSCluster cluster = getCluster(); + + // Create mock locations + getCluster().installMockLocations(); + List<RouterContext> routers = cluster.getRouters(); + + // Add extra location to the root mount / such that the root mount points: + // / + // ns0 -> / + // ns1 -> / + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + resolver.addLocation("/", cluster.getNameservices().get(1), "/"); + } + + // Create a mount that points to 2 dirs in the same ns: + // /same + // ns0 -> / + // ns0 -> /target-ns0 + for (RouterContext rc : routers) { + Router router = rc.getRouter(); + MockResolver resolver = (MockResolver) router.getSubclusterResolver(); + List<String> nss = cluster.getNameservices(); + String ns0 = nss.get(0); + resolver.addLocation("/same", ns0, "/"); + resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0)); + } + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Pick a NS, namenode and getRouter() for this test + RouterContext router = cluster.getRandomRouter(); + this.setRouter(router); + + String ns = cluster.getRandomNameservice(); + this.setNs(ns); + this.setNamenode(cluster.getNamenode(ns, null)); + + // Create a test file on a single NN that is accessed via a getRouter() path + // with 2 destinations. All tests should failover to the alternate + // destination if the wrong NN is attempted first. + Random r = new Random(); + String randomString = "testfile-" + r.nextInt(); + setNamenodeFile("/" + randomString); + setRouterFile("/" + randomString); + + FileSystem nnFs = getNamenodeFileSystem(); + FileSystem routerFs = getRouterFileSystem(); + createFile(nnFs, getNamenodeFile(), 32); + + verifyFileExists(nnFs, getNamenodeFile()); + verifyFileExists(routerFs, getRouterFile()); + } + + private void testListing(String path) throws IOException { + + // Collect the mount table entries for this path + Set<String> requiredPaths = new TreeSet<>(); + RouterContext rc = getRouterContext(); + Router router = rc.getRouter(); + FileSubclusterResolver subclusterResolver = router.getSubclusterResolver(); + for (String mount : subclusterResolver.getMountPoints(path)) { + requiredPaths.add(mount); + } + + // Get files/dirs from the Namenodes + PathLocation location = subclusterResolver.getDestinationForPath(path); + for (RemoteLocation loc : location.getDestinations()) { + String nsId = loc.getNameserviceId(); + String dest = loc.getDest(); + NamenodeContext nn = getCluster().getNamenode(nsId, null); + FileSystem fs = nn.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path(dest)); + for (FileStatus file : files) { + String pathName = file.getPath().getName(); + requiredPaths.add(pathName); + } + } + + // Get files/dirs from the Router + DirectoryListing listing = + getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false); + Iterator<String> requiredPathsIterator = requiredPaths.iterator(); + + // Match each path returned and verify order returned + HdfsFileStatus[] partialListing = listing.getPartialListing(); + for (HdfsFileStatus fileStatus : listing.getPartialListing()) { + String fileName = requiredPathsIterator.next(); + String currentFile = fileStatus.getFullPath(new Path(path)).getName(); + assertEquals(currentFile, fileName); + } + + // Verify the total number of results found/matched + assertEquals( + requiredPaths + " doesn't match " + Arrays.toString(partialListing), + requiredPaths.size(), partialListing.length); + } + + @Override + public void testProxyListFiles() throws IOException, InterruptedException, + URISyntaxException, NoSuchMethodException, SecurityException { + + // Verify that the root listing is a union of the mount table destinations + // and the files stored at all nameservices mounted at the root (ns0 + ns1) + // / --> + // /ns0 (from mount table) + // /ns1 (from mount table) + // /same (from the mount table) + // all items in / of ns0 from mapping of / -> ns0:::/) + // all items in / of ns1 from mapping of / -> ns1:::/) + testListing("/"); + + // Verify that the "/same" mount point lists the contents of both dirs in + // the same ns + // /same --> + // /target-ns0 (from root of ns0) + // /testdir (from contents of /target-ns0) + testListing("/same"); + + // List a non-existing path and validate error response with NN behavior + ClientProtocol namenodeProtocol = + getCluster().getRandomNamenode().getClient().getNamenode(); + Method m = ClientProtocol.class.getMethod( + "getListing", String.class, byte[].class, boolean.class); + String badPath = "/unknownlocation/unknowndir"; + compareResponses(getRouterProtocol(), namenodeProtocol, m, + new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false}); + } + + @Override + public void testProxyRenameFiles() throws IOException, InterruptedException { + + super.testProxyRenameFiles(); + + List<String> nss = getCluster().getNameservices(); + String ns0 = nss.get(0); + String ns1 = nss.get(1); + + // Rename a file from ns0 into the root (mapped to both ns0 and ns1) + String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0); + String filename0 = testDir0 + "/testrename"; + String renamedFile = "/testrename"; + testRename(getRouterContext(), filename0, renamedFile, false); + testRename2(getRouterContext(), filename0, renamedFile, false); + + // Rename a file from ns1 into the root (mapped to both ns0 and ns1) + String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1); + String filename1 = testDir1 + "/testrename"; + testRename(getRouterContext(), filename1, renamedFile, false); + testRename2(getRouterContext(), filename1, renamedFile, false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java new file mode 100644 index 0000000..e5d8348 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the safe mode for the {@link Router} controlled by + * {@link RouterSafemodeService}. + */ +public class TestRouterSafemode { + + private Router router; + private static Configuration conf; + + @BeforeClass + public static void create() throws IOException { + // Wipe state store + deleteStateStore(); + // Configuration that supports the state store + conf = getStateStoreConfiguration(); + // 2 sec startup standby + conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); + // 1 sec cache refresh + conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + // 2 sec post cache update before entering safemode (2 intervals) + conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS); + + conf.set(RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); + conf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); + + // RPC + State Store + Safe Mode only + conf = new RouterConfigBuilder(conf) + .rpc() + .safemode() + .stateStore() + .metrics() + .build(); + } + + @AfterClass + public static void destroy() { + } + + @Before + public void setup() throws IOException, URISyntaxException { + router = new Router(); + router.init(conf); + router.start(); + } + + @After + public void cleanup() throws IOException { + if (router != null) { + router.stop(); + router = null; + } + } + + @Test + public void testSafemodeService() throws IOException { + RouterSafemodeService server = new RouterSafemodeService(router); + server.init(conf); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + } + + @Test + public void testRouterExitSafemode() + throws InterruptedException, IllegalStateException, IOException { + + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // Wait for initial time in milliseconds + long interval = + conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + + conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + Thread.sleep(interval); + + assertFalse(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.RUNNING); + } + + @Test + public void testRouterEnterSafemode() + throws IllegalStateException, IOException, InterruptedException { + + // Verify starting state + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time + long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000; + long t0 = Time.now(); + while (Time.now() - t0 < interval0) { + verifyRouter(RouterServiceState.SAFEMODE); + Thread.sleep(100); + } + + // We wait some time for the state to propagate + long interval1 = 1000 + 2 * conf.getTimeDuration( + DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1), + TimeUnit.MILLISECONDS); + Thread.sleep(interval1); + + // Running + assertFalse(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.RUNNING); + + // Disable cache + router.getStateStore().stopCacheUpdateService(); + + // Wait until the State Store cache is stale in milliseconds + long interval2 = + conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION, + TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) + + conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS); + Thread.sleep(interval2); + + // Safemode + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + } + + @Test + public void testRouterRpcSafeMode() + throws IllegalStateException, IOException { + + assertTrue(router.getRpcServer().isInSafeMode()); + verifyRouter(RouterServiceState.SAFEMODE); + + // If the Router is in Safe Mode, we should get a SafeModeException + boolean exception = false; + try { + router.getRpcServer().delete("/testfile.txt", true); + fail("We should have thrown a safe mode exception"); + } catch (RouterSafeModeException sme) { + exception = true; + } + assertTrue("We should have thrown a safe mode exception", exception); + } + + private void verifyRouter(RouterServiceState status) + throws IllegalStateException, IOException { + assertEquals(status, router.getRouterState()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java new file mode 100644 index 0000000..2ec5d62 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS; +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.util.Time; + +/** + * Utilities to test the State Store. + */ +public final class FederationStateStoreTestUtils { + + /** The State Store Driver implementation class for testing .*/ + private static final Class<? extends StateStoreDriver> + FEDERATION_STORE_DRIVER_CLASS_FOR_TEST = StateStoreFileImpl.class; + + private FederationStateStoreTestUtils() { + // Utility Class + } + + /** + * Get the State Store driver implementation for testing. + * + * @return Class of the State Store driver implementation. + */ + public static Class<? extends StateStoreDriver> getTestDriverClass() { + return FEDERATION_STORE_DRIVER_CLASS_FOR_TEST; + } + + /** + * Create a default State Store configuration. + * + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration() { + Class<? extends StateStoreDriver> clazz = getTestDriverClass(); + return getStateStoreConfiguration(clazz); + } + + /** + * Create a new State Store configuration for a particular driver. + * + * @param clazz Class of the driver to create. + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration( + Class<? extends StateStoreDriver> clazz) { + Configuration conf = new HdfsConfiguration(false); + + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test"); + + conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); + + if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { + setFileConfiguration(conf); + } + return conf; + } + + /** + * Create a new State Store based on a configuration. + * + * @param configuration Configuration for the State Store. + * @return New State Store service. + * @throws IOException If it cannot create the State Store. + * @throws InterruptedException If we cannot wait for the store to start. + */ + public static StateStoreService newStateStore( + Configuration configuration) throws IOException, InterruptedException { + + StateStoreService stateStore = new StateStoreService(); + assertNotNull(stateStore); + + // Set unique identifier, this is normally the router address + String identifier = UUID.randomUUID().toString(); + stateStore.setIdentifier(identifier); + + stateStore.init(configuration); + stateStore.start(); + + // Wait for state store to connect + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + + return stateStore; + } + + /** + * Wait for the State Store to initialize its driver. + * + * @param stateStore State Store. + * @param timeoutMs Time out in milliseconds. + * @throws IOException If the State Store cannot be reached. + * @throws InterruptedException If the sleep is interrupted. + */ + public static void waitStateStore(StateStoreService stateStore, + long timeoutMs) throws IOException, InterruptedException { + long startingTime = Time.monotonicNow(); + while (!stateStore.isDriverReady()) { + Thread.sleep(100); + if (Time.monotonicNow() - startingTime > timeoutMs) { + throw new IOException("Timeout waiting for State Store to connect"); + } + } + } + + /** + * Delete the default State Store. + * + * @throws IOException + */ + public static void deleteStateStore() throws IOException { + Class<? extends StateStoreDriver> driverClass = getTestDriverClass(); + deleteStateStore(driverClass); + } + + /** + * Delete the State Store. + * @param driverClass Class of the State Store driver implementation. + * @throws IOException If it cannot be deleted. + */ + public static void deleteStateStore( + Class<? extends StateStoreDriver> driverClass) throws IOException { + + if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) { + String workingDirectory = System.getProperty("user.dir"); + File dir = new File(workingDirectory + "/statestore"); + if (dir.exists()) { + FileUtils.cleanDirectory(dir); + } + } + } + + /** + * Set the default configuration for drivers based on files. + * + * @param conf Configuration to extend. + */ + public static void setFileConfiguration(Configuration conf) { + String workingPath = System.getProperty("user.dir"); + String stateStorePath = workingPath + "/statestore"; + conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); + } + + /** + * Clear all the records from the State Store. + * + * @param store State Store to remove records from. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearAllRecords(StateStoreService store) + throws IOException { + Collection<Class<? extends BaseRecord>> allRecords = + store.getSupportedRecords(); + for (Class<? extends BaseRecord> recordType : allRecords) { + if (!clearRecords(store, recordType)) { + return false; + } + } + return true; + } + + /** + * Clear records from a certain type from the State Store. + * + * @param store State Store to remove records from. + * @param recordClass Class of the records to remove. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static <T extends BaseRecord> boolean clearRecords( + StateStoreService store, Class<T> recordClass) throws IOException { + List<T> emptyList = new ArrayList<>(); + if (!synchronizeRecords(store, emptyList, recordClass)) { + return false; + } + store.refreshCaches(true); + return true; + } + + /** + * Synchronize a set of records. Remove all and keep the ones specified. + * + * @param stateStore State Store service managing the driver. + * @param records Records to add. + * @param clazz Class of the record to synchronize. + * @return If the synchronization succeeded. + * @throws IOException If it cannot connect to the State Store. + */ + public static <T extends BaseRecord> boolean synchronizeRecords( + StateStoreService stateStore, List<T> records, Class<T> clazz) + throws IOException { + StateStoreDriver driver = stateStore.getDriver(); + driver.verifyDriverReady(); + if (driver.removeAll(clazz)) { + if (driver.putAll(records, true, false)) { + return true; + } + } + return false; + } + + public static List<MountTable> createMockMountTable( + List<String> nameservices) throws IOException { + // create table entries + List<MountTable> entries = new ArrayList<>(); + for (String ns : nameservices) { + Map<String, String> destMap = new HashMap<>(); + destMap.put(ns, "/target-" + ns); + MountTable entry = MountTable.newInstance("/" + ns, destMap); + entries.add(entry); + } + return entries; + } + + public static MembershipState createMockRegistrationForNamenode( + String nameserviceId, String namenodeId, + FederationNamenodeServiceState state) throws IOException { + MembershipState entry = MembershipState.newInstance( + "routerId", nameserviceId, namenodeId, "clusterId", "test", + "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", "0.0.0.0:0", state, false); + MembershipStats stats = MembershipStats.newInstance(); + stats.setNumOfActiveDatanodes(100); + stats.setNumOfDeadDatanodes(10); + stats.setNumOfDecommissioningDatanodes(20); + stats.setNumOfDecomActiveDatanodes(15); + stats.setNumOfDecomDeadDatanodes(5); + stats.setNumOfBlocks(10); + entry.setStats(stats); + return entry; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java new file mode 100644 index 0000000..247d866 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreBase.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.newStateStore; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * Test the basic {@link StateStoreService} {@link MountTableStore} + * functionality. + */ +public class TestStateStoreBase { + + private static StateStoreService stateStore; + private static Configuration conf; + + protected static StateStoreService getStateStore() { + return stateStore; + } + + protected static Configuration getConf() { + return conf; + } + + @BeforeClass + public static void createBase() throws IOException, InterruptedException { + + conf = getStateStoreConfiguration(); + + // Disable auto-reconnect to data store + conf.setLong(RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + TimeUnit.HOURS.toMillis(1)); + } + + @AfterClass + public static void destroyBase() throws Exception { + if (stateStore != null) { + stateStore.stop(); + stateStore.close(); + stateStore = null; + } + } + + @Before + public void setupBase() throws IOException, InterruptedException, + InstantiationException, IllegalAccessException { + if (stateStore == null) { + stateStore = newStateStore(conf); + assertNotNull(stateStore); + } + // Wait for state store to connect + stateStore.loadDriver(); + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org