http://git-wip-us.apache.org/repos/asf/hadoop/blob/8239e3af/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java deleted file mode 100644 index 1023616..0000000 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java +++ /dev/null @@ -1,934 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.Files; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Random; -import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; -import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap; -import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; -import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient; -import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; -import org.apache.hadoop.hdfs.server.datanode.DataNode; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; - -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; -import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import org.apache.hadoop.net.NodeBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID; -import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR; -import static org.junit.Assert.*; - -public class TestNameNodeProvidedImplementation { - - @Rule public TestName name = new TestName(); - public static final Logger LOG = - LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class); - - final Random r = new Random(); - final File fBASE = new File(MiniDFSCluster.getBaseDirectory()); - final Path BASE = new Path(fBASE.toURI().toString()); - final Path NAMEPATH = new Path(BASE, "providedDir"); - final Path NNDIRPATH = new Path(BASE, "nnDir"); - final String SINGLEUSER = "usr1"; - final String SINGLEGROUP = "grp1"; - private final int numFiles = 10; - private final String filePrefix = "file"; - private final String fileSuffix = ".dat"; - private final int baseFileLen = 1024; - private long providedDataSize = 0; - private final String bpid = "BP-1234-10.1.1.1-1224"; - - Configuration conf; - MiniDFSCluster cluster; - - @Before - public void setSeed() throws Exception { - if (fBASE.exists() && !FileUtil.fullyDelete(fBASE)) { - throw new IOException("Could not fully delete " + fBASE); - } - long seed = r.nextLong(); - r.setSeed(seed); - System.out.println(name.getMethodName() + " seed: " + seed); - conf = new HdfsConfiguration(); - conf.set(SingleUGIResolver.USER, SINGLEUSER); - conf.set(SingleUGIResolver.GROUP, SINGLEGROUP); - - conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID, - DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true); - - conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, - TextFileRegionAliasMap.class, BlockAliasMap.class); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR, - NNDIRPATH.toString()); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE, - new Path(NNDIRPATH, fileNameFromBlockPoolID(bpid)).toString()); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ","); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED, - new File(NAMEPATH.toUri()).toString()); - File imageDir = new File(NAMEPATH.toUri()); - if (!imageDir.exists()) { - LOG.info("Creating directory: " + imageDir); - imageDir.mkdirs(); - } - - File nnDir = new File(NNDIRPATH.toUri()); - if (!nnDir.exists()) { - nnDir.mkdirs(); - } - - // create 10 random files under BASE - for (int i=0; i < numFiles; i++) { - File newFile = new File( - new Path(NAMEPATH, filePrefix + i + fileSuffix).toUri()); - if(!newFile.exists()) { - try { - LOG.info("Creating " + newFile.toString()); - newFile.createNewFile(); - Writer writer = new OutputStreamWriter( - new FileOutputStream(newFile.getAbsolutePath()), "utf-8"); - for(int j=0; j < baseFileLen*i; j++) { - writer.write("0"); - } - writer.flush(); - writer.close(); - providedDataSize += newFile.length(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - @After - public void shutdown() throws Exception { - try { - if (cluster != null) { - cluster.shutdown(true, true); - } - } finally { - cluster = null; - } - } - - void createImage(TreeWalk t, Path out, - Class<? extends BlockResolver> blockIdsClass) throws Exception { - createImage(t, out, blockIdsClass, "", TextFileRegionAliasMap.class); - } - - void createImage(TreeWalk t, Path out, - Class<? extends BlockResolver> blockIdsClass, String clusterID, - Class<? extends BlockAliasMap> aliasMapClass) throws Exception { - ImageWriter.Options opts = ImageWriter.defaults(); - opts.setConf(conf); - opts.output(out.toString()) - .blocks(aliasMapClass) - .blockIds(blockIdsClass) - .clusterID(clusterID) - .blockPoolID(bpid); - try (ImageWriter w = new ImageWriter(opts)) { - for (TreePath e : t) { - w.accept(e); - } - } - } - void startCluster(Path nspath, int numDatanodes, - StorageType[] storageTypes, - StorageType[][] storageTypesPerDatanode, - boolean doFormat) throws IOException { - startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode, - doFormat, null); - } - - void startCluster(Path nspath, int numDatanodes, - StorageType[] storageTypes, - StorageType[][] storageTypesPerDatanode, - boolean doFormat, String[] racks) throws IOException { - conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); - - if (storageTypesPerDatanode != null) { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .storageTypes(storageTypesPerDatanode) - .racks(racks) - .build(); - } else if (storageTypes != null) { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .storagesPerDatanode(storageTypes.length) - .storageTypes(storageTypes) - .racks(racks) - .build(); - } else { - cluster = new MiniDFSCluster.Builder(conf) - .format(doFormat) - .manageNameDfsDirs(doFormat) - .numDataNodes(numDatanodes) - .racks(racks) - .build(); - } - cluster.waitActive(); - } - - @Test(timeout=20000) - public void testLoadImage() throws Exception { - final long seed = r.nextLong(); - LOG.info("NAMEPATH: " + NAMEPATH); - createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class); - startCluster(NNDIRPATH, 0, - new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null, - false); - - FileSystem fs = cluster.getFileSystem(); - for (TreePath e : new RandomTreeWalk(seed)) { - FileStatus rs = e.getFileStatus(); - Path hp = new Path(rs.getPath().toUri().getPath()); - assertTrue(fs.exists(hp)); - FileStatus hs = fs.getFileStatus(hp); - assertEquals(rs.getPath().toUri().getPath(), - hs.getPath().toUri().getPath()); - assertEquals(rs.getPermission(), hs.getPermission()); - assertEquals(rs.getLen(), hs.getLen()); - assertEquals(SINGLEUSER, hs.getOwner()); - assertEquals(SINGLEGROUP, hs.getGroup()); - assertEquals(rs.getAccessTime(), hs.getAccessTime()); - assertEquals(rs.getModificationTime(), hs.getModificationTime()); - } - } - - @Test(timeout=30000) - public void testProvidedReporting() throws Exception { - conf.setClass(ImageWriter.Options.UGI_CLASS, - SingleUGIResolver.class, UGIResolver.class); - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - int numDatanodes = 10; - startCluster(NNDIRPATH, numDatanodes, - new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null, - false); - long diskCapacity = 1000; - // set the DISK capacity for testing - for (DataNode dn: cluster.getDataNodes()) { - for (FsVolumeSpi ref : dn.getFSDataset().getFsVolumeReferences()) { - if (ref.getStorageType() == StorageType.DISK) { - ((FsVolumeImpl) ref).setCapacityForTesting(diskCapacity); - } - } - } - // trigger heartbeats to update the capacities - cluster.triggerHeartbeats(); - Thread.sleep(10000); - // verify namenode stats - FSNamesystem namesystem = cluster.getNameNode().getNamesystem(); - DatanodeStatistics dnStats = namesystem.getBlockManager() - .getDatanodeManager().getDatanodeStatistics(); - - // total capacity reported includes only the local volumes and - // not the provided capacity - assertEquals(diskCapacity * numDatanodes, namesystem.getTotal()); - - // total storage used should be equal to the totalProvidedStorage - // no capacity should be remaining! - assertEquals(providedDataSize, dnStats.getProvidedCapacity()); - assertEquals(providedDataSize, namesystem.getProvidedCapacityTotal()); - assertEquals(providedDataSize, dnStats.getStorageTypeStats() - .get(StorageType.PROVIDED).getCapacityTotal()); - assertEquals(providedDataSize, dnStats.getStorageTypeStats() - .get(StorageType.PROVIDED).getCapacityUsed()); - - // verify datanode stats - for (DataNode dn: cluster.getDataNodes()) { - for (StorageReport report : dn.getFSDataset() - .getStorageReports(namesystem.getBlockPoolId())) { - if (report.getStorage().getStorageType() == StorageType.PROVIDED) { - assertEquals(providedDataSize, report.getCapacity()); - assertEquals(providedDataSize, report.getDfsUsed()); - assertEquals(providedDataSize, report.getBlockPoolUsed()); - assertEquals(0, report.getNonDfsUsed()); - assertEquals(0, report.getRemaining()); - } - } - } - - DFSClient client = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), cluster.getConfiguration(0)); - BlockManager bm = namesystem.getBlockManager(); - for (int fileId = 0; fileId < numFiles; fileId++) { - String filename = "/" + filePrefix + fileId + fileSuffix; - LocatedBlocks locatedBlocks = client.getLocatedBlocks( - filename, 0, baseFileLen); - for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { - BlockInfo blockInfo = - bm.getStoredBlock(locatedBlock.getBlock().getLocalBlock()); - Iterator<DatanodeStorageInfo> storagesItr = blockInfo.getStorageInfos(); - - DatanodeStorageInfo info = storagesItr.next(); - assertEquals(StorageType.PROVIDED, info.getStorageType()); - DatanodeDescriptor dnDesc = info.getDatanodeDescriptor(); - // check the locations that are returned by FSCK have the right name - assertEquals(ProvidedStorageMap.ProvidedDescriptor.NETWORK_LOCATION - + PATH_SEPARATOR_STR + ProvidedStorageMap.ProvidedDescriptor.NAME, - NodeBase.getPath(dnDesc)); - // no DatanodeStorageInfos should remain - assertFalse(storagesItr.hasNext()); - } - } - } - - @Test(timeout=500000) - public void testDefaultReplication() throws Exception { - int targetReplication = 2; - conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication); - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockMultiReplicaResolver.class); - // make the last Datanode with only DISK - startCluster(NNDIRPATH, 3, null, - new StorageType[][] { - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.DISK}}, - false); - // wait for the replication to finish - Thread.sleep(50000); - - FileSystem fs = cluster.getFileSystem(); - int count = 0; - for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) { - FileStatus rs = e.getFileStatus(); - Path hp = removePrefix(NAMEPATH, rs.getPath()); - LOG.info("hp " + hp.toUri().getPath()); - //skip HDFS specific files, which may have been created later on. - if (hp.toString().contains("in_use.lock") - || hp.toString().contains("current")) { - continue; - } - e.accept(count++); - assertTrue(fs.exists(hp)); - FileStatus hs = fs.getFileStatus(hp); - - if (rs.isFile()) { - BlockLocation[] bl = fs.getFileBlockLocations( - hs.getPath(), 0, hs.getLen()); - int i = 0; - for(; i < bl.length; i++) { - int currentRep = bl[i].getHosts().length; - assertEquals(targetReplication , currentRep); - } - } - } - } - - - static Path removePrefix(Path base, Path walk) { - Path wpath = new Path(walk.toUri().getPath()); - Path bpath = new Path(base.toUri().getPath()); - Path ret = new Path("/"); - while (!(bpath.equals(wpath) || "".equals(wpath.getName()))) { - ret = "".equals(ret.getName()) - ? new Path("/", wpath.getName()) - : new Path(new Path("/", wpath.getName()), - new Path(ret.toString().substring(1))); - wpath = wpath.getParent(); - } - if (!bpath.equals(wpath)) { - throw new IllegalArgumentException(base + " not a prefix of " + walk); - } - return ret; - } - - private void verifyFileSystemContents() throws Exception { - FileSystem fs = cluster.getFileSystem(); - int count = 0; - // read NN metadata, verify contents match - for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) { - FileStatus rs = e.getFileStatus(); - Path hp = removePrefix(NAMEPATH, rs.getPath()); - LOG.info("hp " + hp.toUri().getPath()); - //skip HDFS specific files, which may have been created later on. - if(hp.toString().contains("in_use.lock") - || hp.toString().contains("current")) { - continue; - } - e.accept(count++); - assertTrue(fs.exists(hp)); - FileStatus hs = fs.getFileStatus(hp); - assertEquals(hp.toUri().getPath(), hs.getPath().toUri().getPath()); - assertEquals(rs.getPermission(), hs.getPermission()); - assertEquals(rs.getOwner(), hs.getOwner()); - assertEquals(rs.getGroup(), hs.getGroup()); - - if (rs.isFile()) { - assertEquals(rs.getLen(), hs.getLen()); - try (ReadableByteChannel i = Channels.newChannel( - new FileInputStream(new File(rs.getPath().toUri())))) { - try (ReadableByteChannel j = Channels.newChannel( - fs.open(hs.getPath()))) { - ByteBuffer ib = ByteBuffer.allocate(4096); - ByteBuffer jb = ByteBuffer.allocate(4096); - while (true) { - int il = i.read(ib); - int jl = j.read(jb); - if (il < 0 || jl < 0) { - assertEquals(il, jl); - break; - } - ib.flip(); - jb.flip(); - int cmp = Math.min(ib.remaining(), jb.remaining()); - for (int k = 0; k < cmp; ++k) { - assertEquals(ib.get(), jb.get()); - } - ib.compact(); - jb.compact(); - } - - } - } - } - } - } - - private BlockLocation[] createFile(Path path, short replication, - long fileLen, long blockLen) throws IOException { - FileSystem fs = cluster.getFileSystem(); - //create a sample file that is not provided - DFSTestUtil.createFile(fs, path, false, (int) blockLen, - fileLen, blockLen, replication, 0, true); - return fs.getFileBlockLocations(path, 0, fileLen); - } - - @Test(timeout=30000) - public void testClusterWithEmptyImage() throws IOException { - // start a cluster with 2 datanodes without any provided storage - startCluster(NNDIRPATH, 2, null, - new StorageType[][] { - {StorageType.DISK}, - {StorageType.DISK}}, - true); - assertTrue(cluster.isClusterUp()); - assertTrue(cluster.isDataNodeUp()); - - BlockLocation[] locations = createFile(new Path("/testFile1.dat"), - (short) 2, 1024*1024, 1024*1024); - assertEquals(1, locations.length); - assertEquals(2, locations[0].getHosts().length); - } - - private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client, - String filename, long fileLen, long expectedBlocks, int expectedLocations) - throws IOException { - LocatedBlocks locatedBlocks = client.getLocatedBlocks(filename, 0, fileLen); - // given the start and length in the above call, - // only one LocatedBlock in LocatedBlocks - assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size()); - DatanodeInfo[] locations = - locatedBlocks.getLocatedBlocks().get(0).getLocations(); - assertEquals(expectedLocations, locations.length); - checkUniqueness(locations); - return locations; - } - - /** - * verify that the given locations are all unique. - * @param locations - */ - private void checkUniqueness(DatanodeInfo[] locations) { - Set<String> set = new HashSet<>(); - for (DatanodeInfo info: locations) { - assertFalse("All locations should be unique", - set.contains(info.getDatanodeUuid())); - set.add(info.getDatanodeUuid()); - } - } - - /** - * Tests setting replication of provided files. - * @throws Exception - */ - @Test(timeout=50000) - public void testSetReplicationForProvidedFiles() throws Exception { - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - // 10 Datanodes with both DISK and PROVIDED storage - startCluster(NNDIRPATH, 10, - new StorageType[]{ - StorageType.PROVIDED, StorageType.DISK}, - null, - false); - setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix); - } - - private void setAndUnsetReplication(String filename) throws Exception { - Path file = new Path(filename); - FileSystem fs = cluster.getFileSystem(); - // set the replication to 4, and test that the file has - // the required replication. - short newReplication = 4; - LOG.info("Setting replication of file {} to {}", filename, newReplication); - fs.setReplication(file, newReplication); - DFSTestUtil.waitForReplication((DistributedFileSystem) fs, - file, newReplication, 10000); - DFSClient client = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), cluster.getConfiguration(0)); - getAndCheckBlockLocations(client, filename, baseFileLen, 1, newReplication); - - // set the replication back to 1 - newReplication = 1; - LOG.info("Setting replication of file {} back to {}", - filename, newReplication); - fs.setReplication(file, newReplication); - // defaultReplication number of replicas should be returned - int defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, - DFSConfigKeys.DFS_REPLICATION_DEFAULT); - DFSTestUtil.waitForReplication((DistributedFileSystem) fs, - file, (short) defaultReplication, 10000); - getAndCheckBlockLocations(client, filename, baseFileLen, 1, - defaultReplication); - } - - @Test(timeout=30000) - public void testProvidedDatanodeFailures() throws Exception { - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - startCluster(NNDIRPATH, 3, null, - new StorageType[][] { - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.DISK}}, - false); - - DataNode providedDatanode1 = cluster.getDataNodes().get(0); - DataNode providedDatanode2 = cluster.getDataNodes().get(1); - - DFSClient client = new DFSClient(new InetSocketAddress("localhost", - cluster.getNameNodePort()), cluster.getConfiguration(0)); - - DatanodeStorageInfo providedDNInfo = getProvidedDatanodeStorageInfo(); - - if (numFiles >= 1) { - String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix; - // 2 locations returned as there are 2 PROVIDED datanodes - DatanodeInfo[] dnInfos = - getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2); - //the location should be one of the provided DNs available - assertTrue( - dnInfos[0].getDatanodeUuid().equals( - providedDatanode1.getDatanodeUuid()) - || dnInfos[0].getDatanodeUuid().equals( - providedDatanode2.getDatanodeUuid())); - - //stop the 1st provided datanode - MiniDFSCluster.DataNodeProperties providedDNProperties1 = - cluster.stopDataNode(0); - - //make NameNode detect that datanode is down - BlockManagerTestUtil.noticeDeadDatanode( - cluster.getNameNode(), - providedDatanode1.getDatanodeId().getXferAddr()); - - //should find the block on the 2nd provided datanode - dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1); - assertEquals(providedDatanode2.getDatanodeUuid(), - dnInfos[0].getDatanodeUuid()); - - // stop the 2nd provided datanode - MiniDFSCluster.DataNodeProperties providedDNProperties2 = - cluster.stopDataNode(0); - // make NameNode detect that datanode is down - BlockManagerTestUtil.noticeDeadDatanode( - cluster.getNameNode(), - providedDatanode2.getDatanodeId().getXferAddr()); - getAndCheckBlockLocations(client, filename, baseFileLen, 1, 0); - - // BR count for the provided ProvidedDatanodeStorageInfo should reset to - // 0, when all DNs with PROVIDED storage fail. - assertEquals(0, providedDNInfo.getBlockReportCount()); - //restart the provided datanode - cluster.restartDataNode(providedDNProperties1, true); - cluster.waitActive(); - - assertEquals(1, providedDNInfo.getBlockReportCount()); - - //should find the block on the 1st provided datanode now - dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1); - //not comparing UUIDs as the datanode can now have a different one. - assertEquals(providedDatanode1.getDatanodeId().getXferAddr(), - dnInfos[0].getXferAddr()); - } - } - - @Test(timeout=300000) - public void testTransientDeadDatanodes() throws Exception { - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - // 3 Datanodes, 2 PROVIDED and other DISK - startCluster(NNDIRPATH, 3, null, - new StorageType[][] { - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.DISK}}, - false); - - DataNode providedDatanode = cluster.getDataNodes().get(0); - DatanodeStorageInfo providedDNInfo = getProvidedDatanodeStorageInfo(); - int initialBRCount = providedDNInfo.getBlockReportCount(); - for (int i= 0; i < numFiles; i++) { - // expect to have 2 locations as we have 2 provided Datanodes. - verifyFileLocation(i, 2); - // NameNode thinks the datanode is down - BlockManagerTestUtil.noticeDeadDatanode( - cluster.getNameNode(), - providedDatanode.getDatanodeId().getXferAddr()); - cluster.waitActive(); - cluster.triggerHeartbeats(); - Thread.sleep(1000); - // the report count should just continue to increase. - assertEquals(initialBRCount + i + 1, - providedDNInfo.getBlockReportCount()); - verifyFileLocation(i, 2); - } - } - - private DatanodeStorageInfo getProvidedDatanodeStorageInfo() { - ProvidedStorageMap providedStorageMap = - cluster.getNamesystem().getBlockManager().getProvidedStorageMap(); - return providedStorageMap.getProvidedStorageInfo(); - } - - @Test(timeout=30000) - public void testNamenodeRestart() throws Exception { - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - // 3 Datanodes, 2 PROVIDED and other DISK - startCluster(NNDIRPATH, 3, null, - new StorageType[][] { - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.DISK}}, - false); - - verifyFileLocation(numFiles - 1, 2); - cluster.restartNameNodes(); - cluster.waitActive(); - verifyFileLocation(numFiles - 1, 2); - } - - /** - * verify that the specified file has a valid provided location. - * @param fileIndex the index of the file to verify. - * @throws Exception - */ - private void verifyFileLocation(int fileIndex, int replication) - throws Exception { - DFSClient client = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), - cluster.getConfiguration(0)); - if (fileIndex < numFiles && fileIndex >= 0) { - String filename = filePrefix + fileIndex + fileSuffix; - File file = new File(new Path(NAMEPATH, filename).toUri()); - long fileLen = file.length(); - long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE, - FixedBlockResolver.BLOCKSIZE_DEFAULT); - long numLocatedBlocks = - fileLen == 0 ? 1 : (long) Math.ceil(fileLen * 1.0 / blockSize); - getAndCheckBlockLocations(client, "/" + filename, fileLen, - numLocatedBlocks, replication); - } - } - - @Test(timeout=30000) - public void testSetClusterID() throws Exception { - String clusterID = "PROVIDED-CLUSTER"; - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class, clusterID, TextFileRegionAliasMap.class); - // 2 Datanodes, 1 PROVIDED and other DISK - startCluster(NNDIRPATH, 2, null, - new StorageType[][] { - {StorageType.PROVIDED, StorageType.DISK}, - {StorageType.DISK}}, - false); - NameNode nn = cluster.getNameNode(); - assertEquals(clusterID, nn.getNamesystem().getClusterId()); - } - - @Test(timeout=30000) - public void testNumberOfProvidedLocations() throws Exception { - // set default replication to 4 - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4); - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - // start with 4 PROVIDED location - startCluster(NNDIRPATH, 4, - new StorageType[]{ - StorageType.PROVIDED, StorageType.DISK}, - null, - false); - int expectedLocations = 4; - for (int i = 0; i < numFiles; i++) { - verifyFileLocation(i, expectedLocations); - } - // stop 2 datanodes, one after the other and verify number of locations. - for (int i = 1; i <= 2; i++) { - DataNode dn = cluster.getDataNodes().get(0); - cluster.stopDataNode(0); - // make NameNode detect that datanode is down - BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(), - dn.getDatanodeId().getXferAddr()); - - expectedLocations = 4 - i; - for (int j = 0; j < numFiles; j++) { - verifyFileLocation(j, expectedLocations); - } - } - } - - @Test(timeout=30000) - public void testNumberOfProvidedLocationsManyBlocks() throws Exception { - // increase number of blocks per file to at least 10 blocks per file - conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10); - // set default replication to 4 - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4); - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - // start with 4 PROVIDED location - startCluster(NNDIRPATH, 4, - new StorageType[]{ - StorageType.PROVIDED, StorageType.DISK}, - null, - false); - int expectedLocations = 4; - for (int i = 0; i < numFiles; i++) { - verifyFileLocation(i, expectedLocations); - } - } - - - @Test - public void testInMemoryAliasMap() throws Exception { - conf.setClass(ImageWriter.Options.UGI_CLASS, - FsUGIResolver.class, UGIResolver.class); - conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, - InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, - "localhost:32445"); - File tempDirectory = - Files.createTempDirectory("in-memory-alias-map").toFile(); - File leveDBPath = new File(tempDirectory, bpid); - leveDBPath.mkdirs(); - conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR, - tempDirectory.getAbsolutePath()); - conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true); - conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10); - InMemoryLevelDBAliasMapServer levelDBAliasMapServer = - new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid); - levelDBAliasMapServer.setConf(conf); - levelDBAliasMapServer.start(); - - createImage(new FSTreeWalk(NAMEPATH, conf), - NNDIRPATH, - FixedBlockResolver.class, "", - InMemoryLevelDBAliasMapClient.class); - levelDBAliasMapServer.close(); - - // start cluster with two datanodes, - // each with 1 PROVIDED volume and other DISK volume - startCluster(NNDIRPATH, 2, - new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, - null, false); - verifyFileSystemContents(); - FileUtils.deleteDirectory(tempDirectory); - } - - private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm, - int dnIndex) throws Exception { - return dnm.getDatanode(cluster.getDataNodes().get(dnIndex).getDatanodeId()); - } - - private void startDecommission(FSNamesystem namesystem, DatanodeManager dnm, - int dnIndex) throws Exception { - namesystem.writeLock(); - DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex); - dnm.getDatanodeAdminManager().startDecommission(dnDesc); - namesystem.writeUnlock(); - } - - private void startMaintenance(FSNamesystem namesystem, DatanodeManager dnm, - int dnIndex) throws Exception { - namesystem.writeLock(); - DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex); - dnm.getDatanodeAdminManager().startMaintenance(dnDesc, Long.MAX_VALUE); - namesystem.writeUnlock(); - } - - private void stopMaintenance(FSNamesystem namesystem, DatanodeManager dnm, - int dnIndex) throws Exception { - namesystem.writeLock(); - DatanodeDescriptor dnDesc = getDatanodeDescriptor(dnm, dnIndex); - dnm.getDatanodeAdminManager().stopMaintenance(dnDesc); - namesystem.writeUnlock(); - } - - @Test - public void testDatanodeLifeCycle() throws Exception { - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - startCluster(NNDIRPATH, 3, - new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, - null, false); - - int fileIndex = numFiles - 1; - - final BlockManager blockManager = cluster.getNamesystem().getBlockManager(); - final DatanodeManager dnm = blockManager.getDatanodeManager(); - - // to start, all 3 DNs are live in ProvidedDatanodeDescriptor. - verifyFileLocation(fileIndex, 3); - - // de-commision first DN; still get 3 replicas. - startDecommission(cluster.getNamesystem(), dnm, 0); - verifyFileLocation(fileIndex, 3); - - // remains the same even after heartbeats. - cluster.triggerHeartbeats(); - verifyFileLocation(fileIndex, 3); - - // start maintenance for 2nd DN; still get 3 replicas. - startMaintenance(cluster.getNamesystem(), dnm, 1); - verifyFileLocation(fileIndex, 3); - - DataNode dn1 = cluster.getDataNodes().get(0); - DataNode dn2 = cluster.getDataNodes().get(1); - - // stop the 1st DN while being decomissioned. - MiniDFSCluster.DataNodeProperties dn1Properties = cluster.stopDataNode(0); - BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(), - dn1.getDatanodeId().getXferAddr()); - - // get 2 locations - verifyFileLocation(fileIndex, 2); - - // stop dn2 while in maintenance. - MiniDFSCluster.DataNodeProperties dn2Properties = cluster.stopDataNode(1); - BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(), - dn2.getDatanodeId().getXferAddr()); - - // 2 valid locations will be found as blocks on nodes that die during - // maintenance are not marked for removal. - verifyFileLocation(fileIndex, 2); - - // stop the maintenance; get only 1 replicas - stopMaintenance(cluster.getNamesystem(), dnm, 0); - verifyFileLocation(fileIndex, 1); - - // restart the stopped DN. - cluster.restartDataNode(dn1Properties, true); - cluster.waitActive(); - - // reports all 3 replicas - verifyFileLocation(fileIndex, 2); - - cluster.restartDataNode(dn2Properties, true); - cluster.waitActive(); - - // reports all 3 replicas - verifyFileLocation(fileIndex, 3); - } - - @Test - public void testProvidedWithHierarchicalTopology() throws Exception { - conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class, - UGIResolver.class); - String packageName = "org.apache.hadoop.hdfs.server.blockmanagement"; - String[] policies = new String[] { - "BlockPlacementPolicyDefault", - "BlockPlacementPolicyRackFaultTolerant", - "BlockPlacementPolicyWithNodeGroup", - "BlockPlacementPolicyWithUpgradeDomain"}; - createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, - FixedBlockResolver.class); - String[] racks = - {"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1", - "/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" }; - for (String policy: policies) { - LOG.info("Using policy: " + packageName + "." + policy); - conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy); - startCluster(NNDIRPATH, racks.length, - new StorageType[]{StorageType.PROVIDED, StorageType.DISK}, - null, false, racks); - verifyFileSystemContents(); - setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix); - cluster.shutdown(); - } - } -}
--------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org