Repository: hadoop Updated Branches: refs/heads/trunk c4ee6915a -> 9dcbdbdb5
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index f6b5d8f..ddb8237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -26,17 +26,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -64,11 +60,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.mortbay.util.ajax.JSON; @@ -78,90 +71,9 @@ import org.slf4j.LoggerFactory; /** * This class tests the decommissioning of nodes. */ -public class TestDecommission { +public class TestDecommission extends AdminStatesBaseTest { public static final Logger LOG = LoggerFactory.getLogger(TestDecommission .class); - static final long seed = 0xDEADBEEFL; - static final int blockSize = 8192; - static final int fileSize = 16384; - static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds - static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec - static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval - - final Random myrand = new Random(); - Path dir; - Path hostsFile; - Path excludeFile; - FileSystem localFileSys; - Configuration conf; - MiniDFSCluster cluster = null; - - @Before - public void setup() throws IOException { - conf = new HdfsConfiguration(); - // Set up the hosts/exclude files. - localFileSys = FileSystem.getLocal(conf); - Path workingDir = localFileSys.getWorkingDirectory(); - dir = new Path(workingDir, PathUtils.getTestDirName(getClass()) + "/work-dir/decommission"); - hostsFile = new Path(dir, "hosts"); - excludeFile = new Path(dir, "exclude"); - - // Setup conf - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); - conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath()); - conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL); - - writeConfigFile(hostsFile, null); - writeConfigFile(excludeFile, null); - } - - @After - public void teardown() throws IOException { - cleanupFile(localFileSys, dir); - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - private void writeConfigFile(Path name, List<String> nodes) - throws IOException { - // delete if it already exists - if (localFileSys.exists(name)) { - localFileSys.delete(name, true); - } - - FSDataOutputStream stm = localFileSys.create(name); - - if (nodes != null) { - for (Iterator<String> it = nodes.iterator(); it.hasNext();) { - String node = it.next(); - stm.writeBytes(node); - stm.writeBytes("\n"); - } - } - stm.close(); - } - - private void writeFile(FileSystem fileSys, Path name, int repl) - throws IOException { - // create and write a file that contains three blocks of data - FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() - .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - (short) repl, blockSize); - byte[] buffer = new byte[fileSize]; - Random rand = new Random(seed); - rand.nextBytes(buffer); - stm.write(buffer); - stm.close(); - LOG.info("Created file " + name + " with " + repl + " replicas."); - } /** * Verify that the number of replicas are as expected for each block in @@ -223,128 +135,6 @@ public class TestDecommission { return null; } - private void cleanupFile(FileSystem fileSys, Path name) throws IOException { - assertTrue(fileSys.exists(name)); - fileSys.delete(name, true); - assertTrue(!fileSys.exists(name)); - } - - /* - * decommission the DN at index dnIndex or one random node if dnIndex is set - * to -1 and wait for the node to reach the given {@code waitForState}. - */ - private DatanodeInfo decommissionNode(int nnIndex, - String datanodeUuid, - ArrayList<DatanodeInfo>decommissionedNodes, - AdminStates waitForState) - throws IOException { - DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf); - DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); - - // - // pick one datanode randomly unless the caller specifies one. - // - int index = 0; - if (datanodeUuid == null) { - boolean found = false; - while (!found) { - index = myrand.nextInt(info.length); - if (!info[index].isDecommissioned()) { - found = true; - } - } - } else { - // The caller specifies a DN - for (; index < info.length; index++) { - if (info[index].getDatanodeUuid().equals(datanodeUuid)) { - break; - } - } - if (index == info.length) { - throw new IOException("invalid datanodeUuid " + datanodeUuid); - } - } - String nodename = info[index].getXferAddr(); - LOG.info("Decommissioning node: " + nodename); - - // write nodename into the exclude file. - ArrayList<String> nodes = new ArrayList<String>(); - if (decommissionedNodes != null) { - for (DatanodeInfo dn : decommissionedNodes) { - nodes.add(dn.getName()); - } - } - nodes.add(nodename); - writeConfigFile(excludeFile, nodes); - refreshNodes(cluster.getNamesystem(nnIndex), conf); - DatanodeInfo ret = NameNodeAdapter.getDatanode( - cluster.getNamesystem(nnIndex), info[index]); - waitNodeState(ret, waitForState); - return ret; - } - - /* Ask a specific NN to stop decommission of the datanode and wait for each - * to reach the NORMAL state. - */ - private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException { - LOG.info("Recommissioning node: " + decommissionedNode); - writeConfigFile(excludeFile, null); - refreshNodes(cluster.getNamesystem(nnIndex), conf); - waitNodeState(decommissionedNode, AdminStates.NORMAL); - - } - - /* - * Wait till node is fully decommissioned. - */ - private void waitNodeState(DatanodeInfo node, - AdminStates state) { - boolean done = state == node.getAdminState(); - while (!done) { - LOG.info("Waiting for node " + node + " to change state to " - + state + " current state: " + node.getAdminState()); - try { - Thread.sleep(HEARTBEAT_INTERVAL * 500); - } catch (InterruptedException e) { - // nothing - } - done = state == node.getAdminState(); - } - LOG.info("node " + node + " reached the state " + state); - } - - /* Get DFSClient to the namenode */ - private static DFSClient getDfsClient(NameNode nn, - Configuration conf) throws IOException { - return new DFSClient(nn.getNameNodeAddress(), conf); - } - - /* Validate cluster has expected number of datanodes */ - private static void validateCluster(DFSClient client, int numDNs) - throws IOException { - DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); - assertEquals("Number of Datanodes ", numDNs, info.length); - } - - /** Start a MiniDFSCluster - * @throws IOException */ - private void startCluster(int numNameNodes, int numDatanodes, - Configuration conf) throws IOException { - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) - .numDataNodes(numDatanodes).build(); - cluster.waitActive(); - for (int i = 0; i < numNameNodes; i++) { - DFSClient client = getDfsClient(cluster.getNameNode(i), conf); - validateCluster(client, numDatanodes); - } - } - - static void refreshNodes(final FSNamesystem ns, final Configuration conf - ) throws IOException { - ns.getBlockManager().getDatanodeManager().refreshNodes(conf); - } - private void verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning) throws InterruptedException, IOException { @@ -376,7 +166,7 @@ public class TestDecommission { public void testDecommission() throws IOException { testDecommission(1, 6); } - + /** * Tests decommission with replicas on the target datanode cannot be migrated * to other datanodes and satisfy the replication factor. Make sure the @@ -387,8 +177,8 @@ public class TestDecommission { LOG.info("Starting test testDecommission"); int numNamenodes = 1; int numDatanodes = 4; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); - startCluster(numNamenodes, numDatanodes, conf); + getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); + startCluster(numNamenodes, numDatanodes); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>( numNamenodes); @@ -399,8 +189,8 @@ public class TestDecommission { // Start decommissioning one namenode at a time ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0); - FileSystem fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); writeFile(fileSys, file1, replicas); @@ -408,14 +198,14 @@ public class TestDecommission { int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(0, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); + DFSClient client = getDfsClient(0); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), @@ -424,9 +214,8 @@ public class TestDecommission { // Restart the cluster and ensure recommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(1, 4, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(1, 4); } /** @@ -449,26 +238,22 @@ public class TestDecommission { */ @Test(timeout=360000) public void testDecommissionOnStandby() throws Exception { - Configuration hdfsConf = new HdfsConfiguration(conf); - hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000); - hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); + getConf().setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 30000); + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); // The time to wait so that the slow DN's heartbeat is considered old // by BlockPlacementPolicyDefault and thus will choose that DN for // excess replica. long slowHeartbeatDNwaitTime = - hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt( - DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, + getConf().getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (getConf(). + getInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1); - cluster = new MiniDFSCluster.Builder(hdfsConf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build(); - - cluster.transitionToActive(0); - cluster.waitActive(); - + startSimpleHACluster(3); // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs. // The last DN is empty. Also configure the last DN to have slow heartbeat @@ -478,29 +263,29 @@ public class TestDecommission { // same as # of DNs, each DN will have a replica for any block. Path file1 = new Path("testDecommissionHA.dat"); int replicas = 3; - FileSystem activeFileSys = cluster.getFileSystem(0); + FileSystem activeFileSys = getCluster().getFileSystem(0); writeFile(activeFileSys, file1, replicas); - HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), - cluster.getNameNode(1)); + HATestUtil.waitForStandbyToCatchUp(getCluster().getNameNode(0), + getCluster().getNameNode(1)); // Step 1.b, start a DN with slow heartbeat, so that we can know for sure it // will be chosen as the target of excess replica during recommission. - hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); - cluster.startDataNodes(hdfsConf, 1, true, null, null, null); - DataNode lastDN = cluster.getDataNodes().get(3); + getConf().setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); + getCluster().startDataNodes(getConf(), 1, true, null, null, null); + DataNode lastDN = getCluster().getDataNodes().get(3); lastDN.getDatanodeUuid(); // Step 2, decommission the first DN at both ANN and SBN. - DataNode firstDN = cluster.getDataNodes().get(0); + DataNode firstDN = getCluster().getDataNodes().get(0); // Step 2.a, ask ANN to decomm the first DN - DatanodeInfo decommissionedNodeFromANN = decommissionNode( - 0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED); + DatanodeInfo decommissionedNodeFromANN = takeNodeOutofService( + 0, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Step 2.b, ask SBN to decomm the first DN - DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNodeFromSBN = takeNodeOutofService(1, + firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Step 3, recommission the first DN on SBN and ANN to create excess replica // It recommissions the node on SBN first to create potential @@ -520,7 +305,7 @@ public class TestDecommission { // After the fix, // After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 ) Thread.sleep(slowHeartbeatDNwaitTime); - recommissionNode(1, decomNodeFromSBN); + putNodeInService(1, decomNodeFromSBN); // Step 3.b, ask ANN to recommission the first DN. // To verify the fix, the test makes sure the excess replica picked by ANN @@ -529,41 +314,41 @@ public class TestDecommission { // by ANN. // 1. restore LastDNprop's heartbeat interval. // 2. Make next-to-last DN's heartbeat slow. - MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3); - LastDNprop.conf.setLong( + MiniDFSCluster.DataNodeProperties lastDNprop = + getCluster().stopDataNode(3); + lastDNprop.conf.setLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); - cluster.restartDataNode(LastDNprop); - - MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2); - nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); - cluster.restartDataNode(nextToLastDNprop); - cluster.waitActive(); + getCluster().restartDataNode(lastDNprop); + + MiniDFSCluster.DataNodeProperties nextToLastDNprop = + getCluster().stopDataNode(2); + nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + 30); + getCluster().restartDataNode(nextToLastDNprop); + getCluster().waitActive(); Thread.sleep(slowHeartbeatDNwaitTime); - recommissionNode(0, decommissionedNodeFromANN); + putNodeInService(0, decommissionedNodeFromANN); // Step 3.c, make sure the DN has deleted the block and report to NNs - cluster.triggerHeartbeats(); - HATestUtil.waitForDNDeletions(cluster); - cluster.triggerDeletionReports(); + getCluster().triggerHeartbeats(); + HATestUtil.waitForDNDeletions(getCluster()); + getCluster().triggerDeletionReports(); // Step 4, decommission the first DN on both ANN and SBN // With the fix to make sure SBN no longer marks excess replica // during recommission, SBN's decommission can finish properly - decommissionNode(0, firstDN.getDatanodeUuid(), null, + takeNodeOutofService(0, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); // Ask SBN to decomm the first DN - decommissionNode(1, firstDN.getDatanodeUuid(), null, + takeNodeOutofService(1, firstDN.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); - - cluster.shutdown(); - } private void testDecommission(int numNamenodes, int numDatanodes) throws IOException { LOG.info("Starting test testDecommission"); - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes); @@ -577,8 +362,8 @@ public class TestDecommission { // Start decommissioning one namenode at a time for (int i = 0; i < numNamenodes; i++) { ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i); - FileSystem fileSys = cluster.getFileSystem(i); - FSNamesystem ns = cluster.getNamesystem(i); + FileSystem fileSys = getCluster().getFileSystem(i); + FSNamesystem ns = getCluster().getNamesystem(i); writeFile(fileSys, file1, replicas); @@ -586,14 +371,14 @@ public class TestDecommission { int liveDecomissioned = ns.getNumDecomLiveDataNodes(); // Decommission one node. Verify that node is decommissioned. - DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(i, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(i), conf); + DFSClient client = getDfsClient(i); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); // wait for the block to be replicated @@ -616,9 +401,8 @@ public class TestDecommission { // Restart the cluster and ensure decommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(numNamenodes, numDatanodes, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(numNamenodes, numDatanodes); } /** @@ -630,13 +414,13 @@ public class TestDecommission { try { LOG.info("Starting test testRecommission"); - startCluster(1, numDatanodes, conf); + startCluster(1, numDatanodes); final Path file1 = new Path("testDecommission.dat"); final int replicas = numDatanodes - 1; ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); - final FileSystem fileSys = cluster.getFileSystem(); + final FileSystem fileSys = getCluster().getFileSystem(); // Write a file to n-1 datanodes writeFile(fileSys, file1, replicas); @@ -647,25 +431,24 @@ public class TestDecommission { replicas, loc.getHosts().length); final String toDecomHost = loc.getNames()[0]; String toDecomUuid = null; - for (DataNode d : cluster.getDataNodes()) { + for (DataNode d : getCluster().getDataNodes()) { if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) { toDecomUuid = d.getDatanodeId().getDatanodeUuid(); break; } } assertNotNull("Could not find a dn with the block!", toDecomUuid); - final DatanodeInfo decomNode = - decommissionNode(0, toDecomUuid, decommissionedNodes, - AdminStates.DECOMMISSIONED); + final DatanodeInfo decomNode = takeNodeOutofService(0, toDecomUuid, + 0, decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); final BlockManager blockManager = - cluster.getNamesystem().getBlockManager(); + getCluster().getNamesystem().getBlockManager(); final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); // Ensure decommissioned datanode is not automatically shutdown - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); @@ -692,15 +475,13 @@ public class TestDecommission { }, 500, 30000); // redecommission and wait for over-replication to be fixed - recommissionNode(0, decomNode); + putNodeInService(0, decomNode); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); - DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0); + DFSTestUtil.waitForReplication(getCluster(), b, 1, replicas, 0); cleanupFile(fileSys, file1); } finally { - if (cluster != null) { - cluster.shutdown(); - } + shutdownCluster(); } } @@ -726,35 +507,33 @@ public class TestDecommission { InterruptedException { LOG.info("Starting test testClusterStats"); int numDatanodes = 1; - startCluster(numNameNodes, numDatanodes, conf); + startCluster(numNameNodes, numDatanodes); for (int i = 0; i < numNameNodes; i++) { - FileSystem fileSys = cluster.getFileSystem(i); + FileSystem fileSys = getCluster().getFileSystem(i); Path file = new Path("testClusterStats.dat"); writeFile(fileSys, file, 1); - FSNamesystem fsn = cluster.getNamesystem(i); - NameNode namenode = cluster.getNameNode(i); + FSNamesystem fsn = getCluster().getNamesystem(i); + NameNode namenode = getCluster().getNameNode(i); - DatanodeInfo decomInfo = decommissionNode(i, null, null, + DatanodeInfo decomInfo = takeNodeOutofService(i, null, 0, null, AdminStates.DECOMMISSION_INPROGRESS); DataNode decomNode = getDataNode(decomInfo); // Check namenode stats for multiple datanode heartbeats verifyStats(namenode, fsn, decomInfo, decomNode, true); // Stop decommissioning and verify stats - writeConfigFile(excludeFile, null); - refreshNodes(fsn, conf); DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo); + putNodeInService(i, retInfo); DataNode retNode = getDataNode(decomInfo); - waitNodeState(retInfo, AdminStates.NORMAL); verifyStats(namenode, fsn, retInfo, retNode, false); } } private DataNode getDataNode(DatanodeInfo decomInfo) { DataNode decomNode = null; - for (DataNode dn: cluster.getDataNodes()) { + for (DataNode dn: getCluster().getDataNodes()) { if (decomInfo.equals(dn.getDatanodeId())) { decomNode = dn; break; @@ -789,22 +568,16 @@ public class TestDecommission { public void testHostsFile(int numNameNodes) throws IOException, InterruptedException { int numDatanodes = 1; - cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) - .numDataNodes(numDatanodes).setupHostsFile(true).build(); - cluster.waitActive(); - + startCluster(numNameNodes, numDatanodes, true, null, false); + // Now empty hosts file and ensure the datanode is disallowed // from talking to namenode, resulting in it's shutdown. - ArrayList<String>list = new ArrayList<String>(); final String bogusIp = "127.0.30.1"; - list.add(bogusIp); - writeConfigFile(hostsFile, list); - + initIncludeHost(bogusIp); + for (int j = 0; j < numNameNodes; j++) { - refreshNodes(cluster.getNamesystem(j), conf); - - DFSClient client = getDfsClient(cluster.getNameNode(j), conf); + refreshNodes(j); + DFSClient client = getDfsClient(j); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); for (int i = 0 ; i < 5 && info.length != 0; i++) { LOG.info("Waiting for datanode to be marked dead"); @@ -828,19 +601,20 @@ public class TestDecommission { LOG.info("Starting test testDecommissionWithOpenfile"); //At most 4 nodes will be decommissioned - startCluster(1, 7, conf); + startCluster(1, 7); - FileSystem fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); - + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + String openFile = "/testDecommissionWithOpenfile.dat"; writeFile(fileSys, new Path(openFile), (short)3); // make sure the file was open for write FSDataOutputStream fdos = fileSys.append(new Path(openFile)); - LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(cluster.getNameNode(0), openFile, 0, fileSize); - + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + getCluster().getNameNode(0), openFile, 0, fileSize); + DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations(); DatanodeInfo[] dnInfos4FirstBlock = lbs.get(0).getLocations(); @@ -863,12 +637,12 @@ public class TestDecommission { //decommission one of the 3 nodes which have last block nodes.add(dnInfos4LastBlock[0].getXferAddr()); dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); - - writeConfigFile(excludeFile, nodes); - refreshNodes(ns, conf); + + initExcludeHosts(nodes); + refreshNodes(0); for (DatanodeInfo dn : dnInfos) { waitNodeState(dn, AdminStates.DECOMMISSIONED); - } + } fdos.close(); } @@ -882,31 +656,32 @@ public class TestDecommission { int numNamenodes = 1; int numDatanodes = 1; int replicas = 1; - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); - conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5); + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5); - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); Path file1 = new Path("testDecommissionWithNamenodeRestart.dat"); - FileSystem fileSys = cluster.getFileSystem(); + FileSystem fileSys = getCluster().getFileSystem(); writeFile(fileSys, file1, replicas); - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeID excludedDatanodeID = info[0]; String excludedDatanodeName = info[0].getXferAddr(); - writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName))); + initExcludeHost(excludedDatanodeName); //Add a new datanode to cluster - cluster.startDataNodes(conf, 1, true, null, null, null, null); + getCluster().startDataNodes(getConf(), 1, true, null, null, null, null); numDatanodes+=1; - assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size()); + assertEquals("Number of datanodes should be 2 ", 2, + getCluster().getDataNodes().size()); //Restart the namenode - cluster.restartNameNode(); + getCluster().restartNameNode(); DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode( - cluster.getNamesystem(), excludedDatanodeID); + getCluster().getNamesystem(), excludedDatanodeID); waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED); // Ensure decommissioned datanode is not automatically shutdown @@ -919,9 +694,8 @@ public class TestDecommission { cleanupFile(fileSys, file1); // Restart the cluster and ensure recommissioned datanodes // are allowed to register with the namenode - cluster.shutdown(); - startCluster(numNamenodes, numDatanodes, conf); - cluster.shutdown(); + shutdownCluster(); + startCluster(numNamenodes, numDatanodes); } /** @@ -933,30 +707,30 @@ public class TestDecommission { int numNamenodes = 1; int numDatanodes = 2; - startCluster(numNamenodes, numDatanodes, conf); + startCluster(numNamenodes, numDatanodes); - DFSClient client = getDfsClient(cluster.getNameNode(), conf); + DFSClient client = getDfsClient(0); DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); DatanodeInfo excludedDatanode = info[0]; String excludedDatanodeName = info[0].getXferAddr(); - writeConfigFile(hostsFile, new ArrayList<String>(Arrays.asList( - excludedDatanodeName, info[1].getXferAddr()))); - decommissionNode(0, excludedDatanode.getDatanodeUuid(), null, + List<String> hosts = new ArrayList<String>(Arrays.asList( + excludedDatanodeName, info[1].getXferAddr())); + initIncludeHosts(hosts.toArray(new String[hosts.size()])); + takeNodeOutofService(0, excludedDatanode.getDatanodeUuid(), 0, null, AdminStates.DECOMMISSIONED); - cluster.stopDataNode(excludedDatanodeName); + getCluster().stopDataNode(excludedDatanodeName); DFSTestUtil.waitForDatanodeState( - cluster, excludedDatanode.getDatanodeUuid(), false, 20000); + getCluster(), excludedDatanode.getDatanodeUuid(), false, 20000); //Restart the namenode - cluster.restartNameNode(); + getCluster().restartNameNode(); assertEquals("There should be one node alive", 1, client.datanodeReport(DatanodeReportType.LIVE).length); assertEquals("There should be one node dead", 1, client.datanodeReport(DatanodeReportType.DEAD).length); - cluster.shutdown(); } /** @@ -976,7 +750,6 @@ public class TestDecommission { @Ignore @Test(timeout=360000) public void testIncludeByRegistrationName() throws Exception { - Configuration hdfsConf = new Configuration(conf); // Any IPv4 address starting with 127 functions as a "loopback" address // which is connected to the current host. So by choosing 127.0.0.100 // as our registration name, we have chosen a name which is also a valid @@ -985,26 +758,21 @@ public class TestDecommission { // to deal with DNS in this test. final String registrationName = "127.0.0.100"; final String nonExistentDn = "127.0.0.10"; - hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName); - cluster = new MiniDFSCluster.Builder(hdfsConf) - .numDataNodes(1).checkDataNodeHostConfig(true) - .setupHostsFile(true).build(); - cluster.waitActive(); + getConf().set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName); + startCluster(1, 1, false, null, true); // Set up an includes file that doesn't have our datanode. - ArrayList<String> nodes = new ArrayList<String>(); - nodes.add(nonExistentDn); - writeConfigFile(hostsFile, nodes); - refreshNodes(cluster.getNamesystem(0), hdfsConf); + initIncludeHost(nonExistentDn); + refreshNodes(0); // Wait for the DN to be marked dead. LOG.info("Waiting for DN to be marked as dead."); - final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf); + final DFSClient client = getDfsClient(0); GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { BlockManagerTestUtil - .checkHeartbeat(cluster.getNamesystem().getBlockManager()); + .checkHeartbeat(getCluster().getNamesystem().getBlockManager()); try { DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD); return info.length == 1; @@ -1017,13 +785,11 @@ public class TestDecommission { // Use a non-empty include file with our registration name. // It should work. - int dnPort = cluster.getDataNodes().get(0).getXferPort(); - nodes = new ArrayList<String>(); - nodes.add(registrationName + ":" + dnPort); - writeConfigFile(hostsFile, nodes); - refreshNodes(cluster.getNamesystem(0), hdfsConf); - cluster.restartDataNode(0); - cluster.triggerHeartbeats(); + int dnPort = getCluster().getDataNodes().get(0).getXferPort(); + initIncludeHost(registrationName + ":" + dnPort); + refreshNodes(0); + getCluster().restartDataNode(0); + getCluster().triggerHeartbeats(); // Wait for the DN to come back. LOG.info("Waiting for DN to come back."); @@ -1031,7 +797,7 @@ public class TestDecommission { @Override public Boolean get() { BlockManagerTestUtil - .checkHeartbeat(cluster.getNamesystem().getBlockManager()); + .checkHeartbeat(getCluster().getNamesystem().getBlockManager()); try { DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE); if (info.length == 1) { @@ -1050,20 +816,19 @@ public class TestDecommission { @Test(timeout=120000) public void testBlocksPerInterval() throws Exception { - Configuration newConf = new Configuration(conf); org.apache.log4j.Logger.getLogger(DecommissionManager.class) .setLevel(Level.TRACE); // Turn the blocks per interval way down - newConf.setInt( + getConf().setInt( DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY, 3); // Disable the normal monitor runs - newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, Integer.MAX_VALUE); - startCluster(1, 3, newConf); - final FileSystem fs = cluster.getFileSystem(); + startCluster(1, 3); + final FileSystem fs = getCluster().getFileSystem(); final DatanodeManager datanodeManager = - cluster.getNamesystem().getBlockManager().getDatanodeManager(); + getCluster().getNamesystem().getBlockManager().getDatanodeManager(); final DecommissionManager decomManager = datanodeManager.getDecomManager(); // Write a 3 block file, so each node has one block. Should scan 3 nodes. @@ -1085,10 +850,9 @@ public class TestDecommission { throws IOException, ExecutionException, InterruptedException { // Decom all nodes ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); - for (DataNode d: cluster.getDataNodes()) { - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + for (DataNode d: getCluster().getDataNodes()) { + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); } // Run decom scan and check @@ -1097,26 +861,25 @@ public class TestDecommission { decomManager.getNumNodesChecked()); // Recommission all nodes for (DatanodeInfo dn : decommissionedNodes) { - recommissionNode(0, dn); + putNodeInService(0, dn); } } @Test(timeout=120000) public void testPendingNodes() throws Exception { - Configuration newConf = new Configuration(conf); org.apache.log4j.Logger.getLogger(DecommissionManager.class) .setLevel(Level.TRACE); // Only allow one node to be decom'd at a time - newConf.setInt( + getConf().setInt( DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, 1); // Disable the normal monitor runs - newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, Integer.MAX_VALUE); - startCluster(1, 3, newConf); - final FileSystem fs = cluster.getFileSystem(); + startCluster(1, 3); + final FileSystem fs = getCluster().getFileSystem(); final DatanodeManager datanodeManager = - cluster.getNamesystem().getBlockManager().getDatanodeManager(); + getCluster().getNamesystem().getBlockManager().getDatanodeManager(); final DecommissionManager decomManager = datanodeManager.getDecomManager(); // Keep a file open to prevent decom from progressing @@ -1125,16 +888,15 @@ public class TestDecommission { // Flush and trigger block reports so the block definitely shows up on NN open1.write(123); open1.hflush(); - for (DataNode d: cluster.getDataNodes()) { + for (DataNode d: getCluster().getDataNodes()) { DataNodeTestUtils.triggerBlockReport(d); } // Decom two nodes, so one is still alive ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); for (int i=0; i<2; i++) { - final DataNode d = cluster.getDataNodes().get(i); - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + final DataNode d = getCluster().getDataNodes().get(i); + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); } @@ -1145,10 +907,9 @@ public class TestDecommission { // Close file, try to decom the last node, should get stuck in tracked open1.close(); - final DataNode d = cluster.getDataNodes().get(2); - DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), - decommissionedNodes, - AdminStates.DECOMMISSION_INPROGRESS); + final DataNode d = getCluster().getDataNodes().get(2); + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); decommissionedNodes.add(dn); BlockManagerTestUtil.recheckDecommissionState(datanodeManager); @@ -1171,16 +932,11 @@ public class TestDecommission { */ @Test public void testCountOnDecommissionedNodeList() throws IOException{ - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1); try { - cluster = - new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1)) - .numDataNodes(1).build(); - cluster.waitActive(); - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); - validateCluster(client, 1); + startCluster(1, 1); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(1); @@ -1188,10 +944,10 @@ public class TestDecommission { // Move datanode1 to Decommissioned state ArrayList<DatanodeInfo> decommissionedNode = namenodeDecomList.get(0); - decommissionNode(0, null, - decommissionedNode, AdminStates.DECOMMISSIONED); + takeNodeOutofService(0, null, 0, decommissionedNode, + AdminStates.DECOMMISSIONED); - FSNamesystem ns = cluster.getNamesystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); DatanodeManager datanodeManager = ns.getBlockManager().getDatanodeManager(); List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); @@ -1202,7 +958,7 @@ public class TestDecommission { datanodeManager.fetchDatanodes(live, null, true); assertTrue(0==live.size()); }finally { - cluster.shutdown(); + shutdownCluster(); } } @@ -1235,21 +991,15 @@ public class TestDecommission { Map<String, Map<String, String>> usage = null; DatanodeInfo decommissionedNodeInfo = null; String zeroNodeUsage = "0.00%"; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1); FileSystem fileSys = null; Path file1 = new Path("testNodeUsage.dat"); try { - SimulatedFSDataset.setFactory(conf); - cluster = - new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1)) - .numDataNodes(numDatanodes) - .simulatedCapacities(nodesCapacity).build(); - cluster.waitActive(); - DFSClient client = getDfsClient(cluster.getNameNode(0), conf); - validateCluster(client, numDatanodes); + SimulatedFSDataset.setFactory(getConf()); + startCluster(1, numDatanodes, false, nodesCapacity, false); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(1); @@ -1258,12 +1008,12 @@ public class TestDecommission { if (decommissionState == AdminStates.DECOMMISSIONED) { // Move datanode1 to Decommissioned state ArrayList<DatanodeInfo> decommissionedNode = namenodeDecomList.get(0); - decommissionedNodeInfo = decommissionNode(0, null, + decommissionedNodeInfo = takeNodeOutofService(0, null, 0, decommissionedNode, decommissionState); } // Write a file(replica 1).Hence will be written to only one live node. - fileSys = cluster.getFileSystem(0); - FSNamesystem ns = cluster.getNamesystem(0); + fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); writeFile(fileSys, file1, 1); Thread.sleep(2000); @@ -1276,7 +1026,7 @@ public class TestDecommission { // Start decommissioning datanode ArrayList<DatanodeInfo> decommissioningNodes = namenodeDecomList. get(0); - decommissionedNodeInfo = decommissionNode(0, null, + decommissionedNodeInfo = takeNodeOutofService(0, null, 0, decommissioningNodes, decommissionState); // NodeUsage should not include DECOMMISSION_INPROGRESS node // (minUsage should be 0.00%) @@ -1286,7 +1036,7 @@ public class TestDecommission { equalsIgnoreCase(zeroNodeUsage)); } // Recommission node - recommissionNode(0, decommissionedNodeInfo); + putNodeInService(0, decommissionedNodeInfo); usage = (Map<String, Map<String, String>>) JSON.parse(ns.getNodeUsage()); String nodeusageAfterRecommi = @@ -1297,7 +1047,6 @@ public class TestDecommission { equalsIgnoreCase(nodeusageAfterRecommi)); } finally { cleanupFile(fileSys, file1); - cluster.shutdown(); } } @@ -1306,9 +1055,8 @@ public class TestDecommission { int numNamenodes = 1; int numDatanodes = 2; - startCluster(numNamenodes,numDatanodes,conf); - cluster.waitActive(); - FSNamesystem ns = cluster.getNamesystem(0); + startCluster(numNamenodes, numDatanodes); + FSNamesystem ns = getCluster().getNamesystem(0); BlockManager blockManager = ns.getBlockManager(); DatanodeStatistics datanodeStatistics = blockManager.getDatanodeManager() .getDatanodeStatistics(); @@ -1318,11 +1066,11 @@ public class TestDecommission { long initialBlockPoolUsed = datanodeStatistics.getBlockPoolUsed(); ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes); - namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes)); + namenodeDecomList.add(0, new ArrayList<>(numDatanodes)); ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0); //decommission one node - DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, - AdminStates.DECOMMISSIONED); + DatanodeInfo decomNode = takeNodeOutofService(0, null, 0, + decommissionedNodes, AdminStates.DECOMMISSIONED); decommissionedNodes.add(decomNode); long newUsedCapacity = datanodeStatistics.getCapacityUsed(); long newTotalCapacity = datanodeStatistics.getCapacityTotal(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java new file mode 100644 index 0000000..63617ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.util.Time; +import org.junit.Test; + +/** + * This class tests node maintenance. + */ +public class TestMaintenanceState extends AdminStatesBaseTest { + public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class); + static private final long EXPIRATION_IN_MS = 500; + + public TestMaintenanceState() { + setUseCombinedHostFileManager(); + } + + /** + * Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to + * AdminStates.NORMAL. + */ + @Test(timeout = 360000) + public void testTakeNodeOutOfEnteringMaintenance() throws Exception { + LOG.info("Starting testTakeNodeOutOfEnteringMaintenance"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE); + + putNodeInService(0, nodeOutofService.getDatanodeUuid()); + + cleanupFile(fileSys, file1); + } + + /** + * Verify a AdminStates.ENTERING_MAINTENANCE node can expire and transition + * to AdminStates.NORMAL upon timeout. + */ + @Test(timeout = 360000) + public void testEnteringMaintenanceExpiration() throws Exception { + LOG.info("Starting testEnteringMaintenanceExpiration"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + // expires in 500 milliseconds + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.ENTERING_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * Verify node stays in AdminStates.NORMAL with invalid expiration. + */ + @Test(timeout = 360000) + public void testInvalidExpiration() throws Exception { + LOG.info("Starting testInvalidExpiration"); + final int replicas = 1; + final int numNamenodes = 1; + final int numDatanodes = 1; + final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + // expiration has to be greater than Time.monotonicNow(). + takeNodeOutofService(0, null, Time.monotonicNow(), null, + AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * When a dead node is put to maintenance, it transitions directly to + * AdminStates.IN_MAINTENANCE. + */ + @Test(timeout = 360000) + public void testPutDeadNodeToMaintenance() throws Exception { + LOG.info("Starting testPutDeadNodeToMaintenance"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); + DFSTestUtil.waitForDatanodeState( + getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); + + int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes(); + int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes(); + + takeNodeOutofService(0, dnProp.datanode.getDatanodeUuid(), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes()); + assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); + + cleanupFile(fileSys, file1); + } + + /** + * When a dead node is put to maintenance, it transitions directly to + * AdminStates.IN_MAINTENANCE. Then AdminStates.IN_MAINTENANCE expires and + * transitions to AdminStates.NORMAL. + */ + @Test(timeout = 360000) + public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception { + LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0); + DFSTestUtil.waitForDatanodeState( + getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000); + + int deadInMaintenance = ns.getNumInMaintenanceDeadDataNodes(); + int liveInMaintenance = ns.getNumInMaintenanceLiveDataNodes(); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + dnProp.datanode.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.IN_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + // no change + assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes()); + assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes()); + + cleanupFile(fileSys, file1); + } + + /** + * Transition from decommissioned state to maintenance state. + */ + @Test(timeout = 360000) + public void testTransitionFromDecommissioned() throws IOException { + LOG.info("Starting testTransitionFromDecommissioned"); + final int numNamenodes = 1; + final int numDatanodes = 4; + final int replicas = 3; + final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, + AdminStates.DECOMMISSIONED); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE, + null, AdminStates.IN_MAINTENANCE); + + cleanupFile(fileSys, file1); + } + + /** + * Transition from decommissioned state to maintenance state. + * After the maintenance state expires, it is transitioned to NORMAL. + */ + @Test(timeout = 360000) + public void testTransitionFromDecommissionedAndExpired() throws IOException { + LOG.info("Starting testTransitionFromDecommissionedAndExpired"); + final int numNamenodes = 1; + final int numDatanodes = 4; + final int replicas = 3; + final Path file1 = new Path("/testTransitionFromDecommissioned.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null, + AdminStates.DECOMMISSIONED); + + takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), + Time.monotonicNow() + EXPIRATION_IN_MS, null, + AdminStates.IN_MAINTENANCE); + + waitNodeState(nodeOutofService, AdminStates.NORMAL); + + cleanupFile(fileSys, file1); + } + + /** + * When a node is put to maintenance, it first transitions to + * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal + * replication before it can be transitioned to AdminStates.IN_MAINTENANCE. + * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin + * state should stay in AdminStates.ENTERING_MAINTENANCE state. + */ + @Test(timeout = 360000) + public void testNodeDeadWhenInEnteringMaintenance() throws Exception { + LOG.info("Starting testNodeDeadWhenInEnteringMaintenance"); + final int numNamenodes = 1; + final int numDatanodes = 1; + final int replicas = 1; + final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat"); + + startCluster(numNamenodes, numDatanodes); + + FileSystem fileSys = getCluster().getFileSystem(0); + FSNamesystem ns = getCluster().getNamesystem(0); + writeFile(fileSys, file1, replicas, 1); + + DatanodeInfo nodeOutofService = takeNodeOutofService(0, + getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null, + AdminStates.ENTERING_MAINTENANCE); + assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); + + MiniDFSCluster.DataNodeProperties dnProp = + getCluster().stopDataNode(nodeOutofService.getXferAddr()); + DFSTestUtil.waitForDatanodeState( + getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000); + DFSClient client = getDfsClient(0); + assertEquals("maintenance node shouldn't be alive", numDatanodes - 1, + client.datanodeReport(DatanodeReportType.LIVE).length); + + getCluster().restartDataNode(dnProp, true); + getCluster().waitActive(); + waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE); + assertEquals(1, ns.getNumEnteringMaintenanceDataNodes()); + + cleanupFile(fileSys, file1); + } + + static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys, + Path name) throws IOException { + // need a raw stream + assertTrue("Not HDFS:"+fileSys.getUri(), + fileSys instanceof DistributedFileSystem); + HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name); + Collection<LocatedBlock> dinfo = dis.getAllBlocks(); + for (LocatedBlock blk : dinfo) { // for each block + DatanodeInfo[] nodes = blk.getLocations(); + if (nodes.length > 0) { + return nodes[0].getDatanodeUuid(); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 7c39bf8..6bb6040 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -158,7 +158,7 @@ public class TestDecommissioningStatus { // write nodename into the exclude file. ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes); nodes.add(dnName); - hostsFileWriter.initExcludeHosts(nodes.toArray(new String[0])); + hostsFileWriter.initExcludeHosts(nodes); } private void checkDecommissionStatus(DatanodeDescriptor decommNode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java index 2ef0b8f..4c8fcef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java @@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.util; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; @@ -73,30 +76,60 @@ public class HostsFileWriter { } public void initExcludeHost(String hostNameAndPort) throws IOException { - initExcludeHosts(hostNameAndPort); + ArrayList<String> nodes = new ArrayList<>(); + nodes.add(hostNameAndPort); + initExcludeHosts(nodes); } - public void initExcludeHosts(String... hostNameAndPorts) throws IOException { + public void initExcludeHosts(List<String> hostNameAndPorts) + throws IOException { + initOutOfServiceHosts(hostNameAndPorts, null); + } + + public void initOutOfServiceHosts(List<String> decommissionHostNameAndPorts, + Map<String, Long> maintenanceHosts) throws IOException { StringBuilder excludeHosts = new StringBuilder(); if (isLegacyHostsFile) { - for (String hostNameAndPort : hostNameAndPorts) { + if (maintenanceHosts != null && maintenanceHosts.size() > 0) { + throw new UnsupportedOperationException( + "maintenance support isn't supported by legacy hosts file"); + } + for (String hostNameAndPort : decommissionHostNameAndPorts) { excludeHosts.append(hostNameAndPort).append("\n"); } - DFSTestUtil.writeFile(localFileSys, excludeFile, excludeHosts.toString()); + DFSTestUtil.writeFile(localFileSys, excludeFile, + excludeHosts.toString()); } else { HashSet<DatanodeAdminProperties> allDNs = new HashSet<>(); - for (String hostNameAndPort : hostNameAndPorts) { - DatanodeAdminProperties dn = new DatanodeAdminProperties(); - String[] hostAndPort = hostNameAndPort.split(":"); - dn.setHostName(hostAndPort[0]); - dn.setPort(Integer.parseInt(hostAndPort[1])); - dn.setAdminState(AdminStates.DECOMMISSIONED); - allDNs.add(dn); + if (decommissionHostNameAndPorts != null) { + for (String hostNameAndPort : decommissionHostNameAndPorts) { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String[] hostAndPort = hostNameAndPort.split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.DECOMMISSIONED); + allDNs.add(dn); + } + } + if (maintenanceHosts != null) { + for (Map.Entry<String, Long> hostEntry : maintenanceHosts.entrySet()) { + DatanodeAdminProperties dn = new DatanodeAdminProperties(); + String[] hostAndPort = hostEntry.getKey().split(":"); + dn.setHostName(hostAndPort[0]); + dn.setPort(Integer.parseInt(hostAndPort[1])); + dn.setAdminState(AdminStates.IN_MAINTENANCE); + dn.setMaintenanceExpireTimeInMS(hostEntry.getValue()); + allDNs.add(dn); + } } CombinedHostsFileWriter.writeFile(combinedFile.toString(), allDNs); } } + public void initIncludeHost(String hostNameAndPort) throws IOException { + initIncludeHosts(new String[]{hostNameAndPort}); + } + public void initIncludeHosts(String[] hostNameAndPorts) throws IOException { StringBuilder includeHosts = new StringBuilder(); if (isLegacyHostsFile) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java index 923cf66..b48784f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestCombinedHostsFileReader.java @@ -62,7 +62,7 @@ public class TestCombinedHostsFileReader { public void testLoadExistingJsonFile() throws Exception { Set<DatanodeAdminProperties> all = CombinedHostsFileReader.readFile(EXISTING_FILE.getAbsolutePath()); - assertEquals(5, all.size()); + assertEquals(7, all.size()); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/9dcbdbdb/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json index 64fca48..9c852e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/dfs.hosts.json @@ -3,3 +3,5 @@ {"hostName": "host3", "adminState": "DECOMMISSIONED"} {"hostName": "host4", "upgradeDomain": "ud2", "adminState": "DECOMMISSIONED"} {"hostName": "host5", "port": 8090} +{"hostName": "host6", "adminState": "IN_MAINTENANCE"} +{"hostName": "host7", "adminState": "IN_MAINTENANCE", "maintenanceExpireTimeInMS": "112233"} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org