http://git-wip-us.apache.org/repos/asf/hadoop/blob/9062df87/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java deleted file mode 100644 index 9f733ff..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ /dev/null @@ -1,1775 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.slf4j.LoggerFactory.getLogger; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.ReconfigurationException; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.StripedFileTestUtil; -import org.apache.hadoop.hdfs.client.HdfsAdmin; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.GenericTestUtils.LogCapturer; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.event.Level; - -import com.google.common.base.Supplier; - -/** - * Tests that StoragePolicySatisfier daemon is able to check the blocks to be - * moved and finding its suggested target locations to move. - */ -public class TestStoragePolicySatisfier { - - { - GenericTestUtils.setLogLevel( - getLogger(FSTreeTraverser.class), Level.DEBUG); - } - - private static final String ONE_SSD = "ONE_SSD"; - private static final String COLD = "COLD"; - private static final Logger LOG = - LoggerFactory.getLogger(TestStoragePolicySatisfier.class); - private final Configuration config = new HdfsConfiguration(); - private StorageType[][] allDiskTypes = - new StorageType[][]{{StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}}; - private MiniDFSCluster hdfsCluster = null; - final private int numOfDatanodes = 3; - final private int storagesPerDatanode = 2; - final private long capacity = 2 * 256 * 1024 * 1024; - final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; - private DistributedFileSystem dfs = null; - private static final int DEFAULT_BLOCK_SIZE = 1024; - - private void shutdownCluster() { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - - private void createCluster() throws IOException { - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); - } - - @Test(timeout = 300000) - public void testWhenStoragePolicySetToCOLD() - throws Exception { - - try { - createCluster(); - doTestWhenStoragePolicySetToCOLD(); - } finally { - shutdownCluster(); - } - } - - private void doTestWhenStoragePolicySetToCOLD() throws Exception { - // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - dfs.satisfyStoragePolicy(new Path(file)); - - hdfsCluster.triggerHeartbeats(); - // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 3, 30000, dfs); - } - - @Test(timeout = 300000) - public void testWhenStoragePolicySetToALLSSD() - throws Exception { - try { - createCluster(); - // Change policy to ALL_SSD - dfs.setStoragePolicy(new Path(file), "ALL_SSD"); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}}; - - // Making sure SDD based nodes added to cluster. Adding SSD based - // datanodes. - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - // Wait till StorgePolicySatisfier Identified that block to move to SSD - // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 3, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - @Test(timeout = 300000) - public void testWhenStoragePolicySetToONESSD() - throws Exception { - try { - createCluster(); - // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - - // Making sure SDD based nodes added to cluster. Adding SSD based - // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - // Wait till StorgePolicySatisfier Identified that block to move to SSD - // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify that the block storage movement report will be propagated - * to Namenode via datanode heartbeat. - */ - @Test(timeout = 300000) - public void testBlksStorageMovementAttemptFinishedReport() throws Exception { - try { - createCluster(); - // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - - // Making sure SDD based nodes added to cluster. Adding SSD based - // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - - // Wait till the block is moved to SSD areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); - - waitForBlocksMovementAttemptReport(1, 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify that multiple files are giving to satisfy storage policy - * and should work well altogether. - */ - @Test(timeout = 300000) - public void testMultipleFilesForSatisfyStoragePolicy() throws Exception { - try { - createCluster(); - List<String> files = new ArrayList<>(); - files.add(file); - - // Creates 4 more files. Send all of them for satisfying the storage - // policy together. - for (int i = 0; i < 4; i++) { - String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i; - files.add(file1); - writeContent(file1); - } - // Change policy to ONE_SSD - for (String fileName : files) { - dfs.setStoragePolicy(new Path(fileName), ONE_SSD); - dfs.satisfyStoragePolicy(new Path(fileName)); - } - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - - // Making sure SDD based nodes added to cluster. Adding SSD based - // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - hdfsCluster.triggerHeartbeats(); - - for (String fileName : files) { - // Wait till the block is moved to SSD areas - DFSTestUtil.waitExpectedStorageType( - fileName, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - fileName, StorageType.DISK, 2, 30000, dfs); - } - - waitForBlocksMovementAttemptReport(files.size(), 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for file. - * @throws Exception - */ - @Test(timeout = 300000) - public void testSatisfyFileWithHdfsAdmin() throws Exception { - try { - createCluster(); - HdfsAdmin hdfsAdmin = - new HdfsAdmin(FileSystem.getDefaultUri(config), config); - // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - - hdfsCluster.triggerHeartbeats(); - // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 3, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify hdfsAdmin.satisfyStoragePolicy works well for dir. - * @throws Exception - */ - @Test(timeout = 300000) - public void testSatisfyDirWithHdfsAdmin() throws Exception { - try { - createCluster(); - HdfsAdmin hdfsAdmin = - new HdfsAdmin(FileSystem.getDefaultUri(config), config); - final String subDir = "/subDir"; - final String subFile1 = subDir + "/subFile1"; - final String subDir2 = subDir + "/subDir2"; - final String subFile2 = subDir2 + "/subFile2"; - dfs.mkdirs(new Path(subDir)); - writeContent(subFile1); - dfs.mkdirs(new Path(subDir2)); - writeContent(subFile2); - - // Change policy to COLD - dfs.setStoragePolicy(new Path(subDir), ONE_SSD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - hdfsAdmin.satisfyStoragePolicy(new Path(subDir)); - - hdfsCluster.triggerHeartbeats(); - - // take effect for the file in the directory. - DFSTestUtil.waitExpectedStorageType( - subFile1, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - subFile1, StorageType.DISK, 2, 30000, dfs); - - // take no effect for the sub-dir's file in the directory. - DFSTestUtil.waitExpectedStorageType( - subFile2, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - subFile2, StorageType.DISK, 2, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify hdfsAdmin.satisfyStoragePolicy exceptions. - * @throws Exception - */ - @Test(timeout = 300000) - public void testSatisfyWithExceptions() throws Exception { - try { - createCluster(); - final String nonExistingFile = "/noneExistingFile"; - hdfsCluster.getConfiguration(0). - setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false); - hdfsCluster.restartNameNodes(); - hdfsCluster.waitActive(); - HdfsAdmin hdfsAdmin = - new HdfsAdmin(FileSystem.getDefaultUri(config), config); - - try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - Assert.fail(String.format( - "Should failed to satisfy storage policy " - + "for %s since %s is set to false.", - file, DFS_STORAGE_POLICY_ENABLED_KEY)); - } catch (IOException e) { - Assert.assertTrue(e.getMessage().contains(String.format( - "Failed to satisfy storage policy since %s is set to false.", - DFS_STORAGE_POLICY_ENABLED_KEY))); - } - - hdfsCluster.getConfiguration(0). - setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true); - hdfsCluster.restartNameNodes(); - hdfsCluster.waitActive(); - hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config); - try { - hdfsAdmin.satisfyStoragePolicy(new Path(nonExistingFile)); - Assert.fail("Should throw FileNotFoundException for " + - nonExistingFile); - } catch (FileNotFoundException e) { - - } - - try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - Assert.fail(String.format( - "Should failed to satisfy storage policy " - + "for %s ,since it has been " - + "added to satisfy movement queue.", file)); - } catch (IOException e) { - GenericTestUtils.assertExceptionContains( - String.format("Cannot request to call satisfy storage policy " - + "on path %s, as this file/dir was already called for " - + "satisfying storage policy.", file), e); - } - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify that for the given path, some of the blocks or block src - * locations(src nodes) under the given path will be scheduled for block - * movement. - * - * For example, there are two block for a file: - * - * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], - * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. - * Only one datanode is available with storage type ARCHIVE, say D. - * - * SPS will schedule block movement to the coordinator node with the details, - * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)]. - */ - @Test(timeout = 300000) - public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy() - throws Exception { - try { - createCluster(); - // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; - - // Adding ARCHIVE based datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - // Wait till StorgePolicySatisfier identified that block to move to - // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); - - waitForBlocksMovementAttemptReport(1, 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify that for the given path, no blocks or block src - * locations(src nodes) under the given path will be scheduled for block - * movement as there are no available datanode with required storage type. - * - * For example, there are two block for a file: - * - * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], - * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. - * No datanode is available with storage type ARCHIVE. - * - * SPS won't schedule any block movement for this path. - */ - @Test(timeout = 300000) - public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() - throws Exception { - try { - createCluster(); - // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; - // Adding DISK based datanodes - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - - // No block movement will be scheduled as there is no target node - // available with the required storage type. - waitForAttemptedItems(1, 30000); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); - // Since there is no target node the item will get timed out and then - // re-attempted. - waitForAttemptedItems(1, 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Tests to verify that SPS should not start when a Mover instance - * is running. - */ - @Test(timeout = 300000) - public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier() - throws Exception { - boolean running; - FSDataOutputStream out = null; - try { - createCluster(); - // Stop SPS - hdfsCluster.getNameNode().reconfigurePropertyImpl( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "false"); - running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); - Assert.assertFalse("SPS should stopped as configured.", running); - - // Simulate the case by creating MOVER_ID file - out = hdfsCluster.getFileSystem().create( - HdfsServerConstants.MOVER_ID_PATH); - - // Restart SPS - hdfsCluster.getNameNode().reconfigurePropertyImpl( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true"); - - running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); - Assert.assertFalse("SPS should not be able to run as file " - + HdfsServerConstants.MOVER_ID_PATH + " is being hold.", running); - - // Simulate Mover exists - out.close(); - out = null; - hdfsCluster.getFileSystem().delete( - HdfsServerConstants.MOVER_ID_PATH, true); - - // Restart SPS again - hdfsCluster.getNameNode().reconfigurePropertyImpl( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, "true"); - running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); - Assert.assertTrue("SPS should be running as " - + "Mover already exited", running); - - // Check functionality after SPS restart - doTestWhenStoragePolicySetToCOLD(); - } catch (ReconfigurationException e) { - throw new IOException("Exception when reconfigure " - + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, e); - } finally { - if (out != null) { - out.close(); - } - hdfsCluster.shutdown(); - } - } - - /** - * Tests to verify that SPS should be able to start when the Mover ID file - * is not being hold by a Mover. This can be the case when Mover exits - * ungracefully without deleting the ID file from HDFS. - */ - @Test(timeout = 300000) - public void testWhenMoverExitsWithoutDeleteMoverIDFile() - throws IOException { - try { - createCluster(); - // Simulate the case by creating MOVER_ID file - DFSTestUtil.createFile(hdfsCluster.getFileSystem(), - HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0); - hdfsCluster.restartNameNode(true); - boolean running = hdfsCluster.getFileSystem() - .getClient().isStoragePolicySatisfierRunning(); - Assert.assertTrue("SPS should be running as " - + "no Mover really running", running); - } finally { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - } - - /** - * Test to verify that satisfy worker can't move blocks. If the given block is - * pinned it shouldn't be considered for retries. - */ - @Test(timeout = 120000) - public void testMoveWithBlockPinning() throws Exception { - try{ - config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3) - .storageTypes( - new StorageType[][] {{StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}}) - .build(); - - hdfsCluster.waitActive(); - dfs = hdfsCluster.getFileSystem(); - - // create a file with replication factor 3 and mark 2 pinned block - // locations. - final String file1 = createFileAndSimulateFavoredNodes(2); - - // Change policy to COLD - dfs.setStoragePolicy(new Path(file1), COLD); - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}}; - // Adding DISK based datanodes - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - dfs.satisfyStoragePolicy(new Path(file1)); - hdfsCluster.triggerHeartbeats(); - - // No block movement will be scheduled as there is no target node - // available with the required storage type. - waitForAttemptedItems(1, 30000); - waitForBlocksMovementAttemptReport(1, 30000); - DFSTestUtil.waitExpectedStorageType( - file1, StorageType.ARCHIVE, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file1, StorageType.DISK, 2, 30000, dfs); - } finally { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - } - - /** - * Tests to verify that for the given path, only few of the blocks or block - * src locations(src nodes) under the given path will be scheduled for block - * movement. - * - * For example, there are two block for a file: - * - * File1 => two blocks and default storage policy(HOT). - * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)], - * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)]. - * - * Now, set storage policy to COLD. - * Only two Dns are available with expected storage type ARCHIVE, say A, E. - * - * SPS will schedule block movement to the coordinator node with the details, - * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)], - * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)]. - */ - @Test(timeout = 300000) - public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes() - throws Exception { - try { - int numOfDns = 5; - config.setLong("dfs.block.size", 1024); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - allDiskTypes = - new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.ARCHIVE}}; - hdfsCluster = startCluster(config, allDiskTypes, numOfDns, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file, (short) 5); - - // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); - - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - // Wait till StorgePolicySatisfier identified that block to move to - // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); - - waitForBlocksMovementAttemptReport(1, 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Tests that moving block storage with in the same datanode. Let's say we - * have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when - * storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block - * should move to DN2[SSD] successfully. - */ - @Test(timeout = 300000) - public void testBlockMoveInSameDatanodeWithONESSD() throws Exception { - StorageType[][] diskTypes = - new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.RAM_DISK}}; - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); - - // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); - - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); - - } finally { - shutdownCluster(); - } - } - - /** - * Tests that moving block storage with in the same datanode and remote node. - * Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK], - * DN4[DISK,DISK] when storagepolicy set to WARM and request - * satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and - * DN2[ARCHIVE] successfully. - */ - @Test(timeout = 300000) - public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception { - StorageType[][] diskTypes = - new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}}; - - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - try { - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); - - // Change policy to WARM - dfs.setStoragePolicy(new Path(file), "WARM"); - dfs.satisfyStoragePolicy(new Path(file)); - hdfsCluster.triggerHeartbeats(); - - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * If replica with expected storage type already exist in source DN then that - * DN should be skipped. - */ - @Test(timeout = 300000) - public void testSPSWhenReplicaWithExpectedStorageAlreadyAvailableInSource() - throws Exception { - StorageType[][] diskTypes = new StorageType[][] { - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.DISK, StorageType.ARCHIVE}}; - - try { - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - // 1. Write two replica on disk - DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE, - (short) 2, 0); - // 2. Change policy to COLD, so third replica will be written to ARCHIVE. - dfs.setStoragePolicy(new Path(file), "COLD"); - - // 3.Change replication factor to 3. - dfs.setReplication(new Path(file), (short) 3); - - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000, - dfs); - - // 4. Change policy to HOT, so we can move the all block to DISK. - dfs.setStoragePolicy(new Path(file), "HOT"); - - // 4. Satisfy the policy. - dfs.satisfyStoragePolicy(new Path(file)); - - // 5. Block should move successfully . - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * Tests that movements should not be assigned when there is no space in - * target DN. - */ - @Test(timeout = 300000) - public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace() - throws Exception { - StorageType[][] diskTypes = - new StorageType[][]{{StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}}; - config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1); - try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, dnCapacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); - - // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); - Path filePath = new Path("/testChooseInSameDatanode"); - final FSDataOutputStream out = - dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); - try { - dfs.setStoragePolicy(filePath, ONE_SSD); - // Try to fill up SSD part by writing content - long remaining = dfs.getStatus().getRemaining() / (3 * 2); - for (int i = 0; i < remaining; i++) { - out.write(i); - } - } finally { - out.close(); - } - hdfsCluster.triggerHeartbeats(); - ArrayList<DataNode> dataNodes = hdfsCluster.getDataNodes(); - // Temporarily disable heart beats, so that we can assert whether any - // items schedules for DNs even though DN's does not have space to write. - // Disabling heart beats can keep scheduled items on DatanodeDescriptor - // itself. - for (DataNode dataNode : dataNodes) { - DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); - } - dfs.satisfyStoragePolicy(new Path(file)); - - // Wait for items to be processed - waitForAttemptedItems(1, 30000); - - // Make sure no items assigned for movements - Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem() - .getBlockManager().getDatanodeManager().getDatanodes(); - for (DatanodeDescriptor dd : dns) { - assertNull(dd.getBlocksToMoveStorages(1)); - } - - // Enable heart beats now - for (DataNode dataNode : dataNodes) { - DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false); - } - hdfsCluster.triggerHeartbeats(); - - DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000, - dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs); - } finally { - shutdownCluster(); - } - } - - /** - * Tests that Xattrs should be cleaned if satisfy storage policy called on EC - * file with unsuitable storage policy set. - * - * @throws Exception - */ - @Test(timeout = 300000) - public void testSPSShouldNotLeakXattrIfSatisfyStoragePolicyCallOnECFiles() - throws Exception { - StorageType[][] diskTypes = - new StorageType[][]{{StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}, - {StorageType.SSD, StorageType.DISK}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.SSD}}; - - int defaultStripedBlockSize = - StripedFileTestUtil.getDefaultECPolicy().getCellSize() * 4; - config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize); - config.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); - config.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, - 1L); - config.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, - false); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - try { - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - dfs.enableErasureCodingPolicy( - StripedFileTestUtil.getDefaultECPolicy().getName()); - - // set "/foo" directory with ONE_SSD storage policy. - ClientProtocol client = NameNodeProxies.createProxy(config, - hdfsCluster.getFileSystem(0).getUri(), ClientProtocol.class) - .getProxy(); - String fooDir = "/foo"; - client.mkdirs(fooDir, new FsPermission((short) 777), true); - // set an EC policy on "/foo" directory - client.setErasureCodingPolicy(fooDir, - StripedFileTestUtil.getDefaultECPolicy().getName()); - - // write file to fooDir - final String testFile = "/foo/bar"; - long fileLen = 20 * defaultStripedBlockSize; - DFSTestUtil.createFile(dfs, new Path(testFile), fileLen, (short) 3, 0); - - // ONESSD is unsuitable storage policy on EC files - client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); - dfs.satisfyStoragePolicy(new Path(testFile)); - - // Thread.sleep(9000); // To make sure SPS triggered - // verify storage types and locations - LocatedBlocks locatedBlocks = - client.getBlockLocations(testFile, 0, fileLen); - for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { - for (StorageType type : lb.getStorageTypes()) { - Assert.assertEquals(StorageType.DISK, type); - } - } - - // Make sure satisfy xattr has been removed. - DFSTestUtil.waitForXattrRemoved(testFile, XATTR_SATISFY_STORAGE_POLICY, - hdfsCluster.getNamesystem(), 30000); - } finally { - shutdownCluster(); - } - } - - /** - * Test SPS with empty file. - * 1. Create one empty file. - * 2. Call satisfyStoragePolicy for empty file. - * 3. SPS should skip this file and xattr should not be added for empty file. - */ - @Test(timeout = 300000) - public void testSPSWhenFileLengthIsZero() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path filePath = new Path("/zeroSizeFile"); - DFSTestUtil.createFile(fs, filePath, 0, (short) 1, 0); - FSEditLog editlog = cluster.getNameNode().getNamesystem().getEditLog(); - long lastWrittenTxId = editlog.getLastWrittenTxId(); - fs.satisfyStoragePolicy(filePath); - Assert.assertEquals("Xattr should not be added for the file", - lastWrittenTxId, editlog.getLastWrittenTxId()); - INode inode = cluster.getNameNode().getNamesystem().getFSDirectory() - .getINode(filePath.toString()); - Assert.assertTrue("XAttrFeature should be null for file", - inode.getXAttrFeature() == null); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for low redundant file blocks. - * 1. Create cluster with 3 datanode. - * 1. Create one file with 3 replica. - * 2. Set policy and call satisfyStoragePolicy for file. - * 3. Stop NameNode and Datanodes. - * 4. Start NameNode with 2 datanode and wait for block movement. - * 5. Start third datanode. - * 6. Third Datanode replica also should be moved in proper - * sorage based on policy. - */ - @Test(timeout = 300000) - public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - conf.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - "3000"); - StorageType[][] newtypes = new StorageType[][] { - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}}; - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .storageTypes(newtypes).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path filePath = new Path("/zeroSizeFile"); - DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0); - fs.setStoragePolicy(filePath, "COLD"); - List<DataNodeProperties> list = new ArrayList<>(); - list.add(cluster.stopDataNode(0)); - list.add(cluster.stopDataNode(0)); - list.add(cluster.stopDataNode(0)); - cluster.restartNameNodes(); - cluster.restartDataNode(list.get(0), false); - cluster.restartDataNode(list.get(1), false); - cluster.waitActive(); - fs.satisfyStoragePolicy(filePath); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.ARCHIVE, 2, 30000, cluster.getFileSystem()); - cluster.restartDataNode(list.get(2), false); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem()); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for extra redundant file blocks. - * 1. Create cluster with 5 datanode. - * 2. Create one file with 5 replica. - * 3. Set file replication to 3. - * 4. Set policy and call satisfyStoragePolicy for file. - * 5. Block should be moved successfully. - */ - @Test(timeout = 300000) - public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - conf.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - "3000"); - StorageType[][] newtypes = new StorageType[][] { - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}}; - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5) - .storageTypes(newtypes).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path filePath = new Path("/zeroSizeFile"); - DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0); - fs.setReplication(filePath, (short) 3); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(BlockStorageMovementAttemptedItems.class)); - fs.setStoragePolicy(filePath, "COLD"); - fs.satisfyStoragePolicy(filePath); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem()); - assertFalse("Log output does not contain expected log message: ", - logs.getOutput().contains("some of the blocks are low redundant")); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for empty directory, xAttr should be removed. - */ - @Test(timeout = 300000) - public void testSPSForEmptyDirectory() throws IOException, TimeoutException, - InterruptedException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path emptyDir = new Path("/emptyDir"); - fs.mkdirs(emptyDir); - fs.satisfyStoragePolicy(emptyDir); - // Make sure satisfy xattr has been removed. - DFSTestUtil.waitForXattrRemoved("/emptyDir", - XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for not exist directory. - */ - @Test(timeout = 300000) - public void testSPSForNonExistDirectory() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path emptyDir = new Path("/emptyDir"); - try { - fs.satisfyStoragePolicy(emptyDir); - fail("FileNotFoundException should throw"); - } catch (FileNotFoundException e) { - // nothing to do - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for directory tree which doesn't have files. - */ - @Test(timeout = 300000) - public void testSPSWithDirectoryTreeWithoutFile() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - // Create directories - /* - * root - * | - * A--------C--------D - * | - * G----H----I - * | - * O - */ - DistributedFileSystem fs = cluster.getFileSystem(); - fs.mkdirs(new Path("/root/C/H/O")); - fs.mkdirs(new Path("/root/A")); - fs.mkdirs(new Path("/root/D")); - fs.mkdirs(new Path("/root/C/G")); - fs.mkdirs(new Path("/root/C/I")); - fs.satisfyStoragePolicy(new Path("/root")); - // Make sure satisfy xattr has been removed. - DFSTestUtil.waitForXattrRemoved("/root", - XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - /** - * Test SPS for directory which has multilevel directories. - */ - @Test(timeout = 300000) - public void testMultipleLevelDirectoryForSatisfyStoragePolicy() - throws Exception { - try { - StorageType[][] diskTypes = new StorageType[][] { - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}}; - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - createDirectoryTree(dfs); - - List<String> files = getDFSListOfTree(); - dfs.setStoragePolicy(new Path("/root"), COLD); - dfs.satisfyStoragePolicy(new Path("/root")); - for (String fileName : files) { - // Wait till the block is moved to ARCHIVE - DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, - 30000, dfs); - } - } finally { - shutdownCluster(); - } - } - - /** - * Test SPS for batch processing. - */ - @Test(timeout = 3000000) - public void testBatchProcessingForSPSDirectory() throws Exception { - try { - StorageType[][] diskTypes = new StorageType[][] { - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}}; - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - // Set queue max capacity - config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, - 5); - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - createDirectoryTree(dfs); - List<String> files = getDFSListOfTree(); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory - .getLog(FSTreeTraverser.class)); - - dfs.setStoragePolicy(new Path("/root"), COLD); - dfs.satisfyStoragePolicy(new Path("/root")); - for (String fileName : files) { - // Wait till the block is moved to ARCHIVE - DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, - 30000, dfs); - } - waitForBlocksMovementAttemptReport(files.size(), 30000); - String expectedLogMessage = "StorageMovementNeeded queue remaining" - + " capacity is zero"; - assertTrue("Log output does not contain expected log message: " - + expectedLogMessage, logs.getOutput().contains(expectedLogMessage)); - } finally { - shutdownCluster(); - } - } - - - /** - * Test traverse when parent got deleted. - * 1. Delete /root when traversing Q - * 2. U, R, S should not be in queued. - */ - @Test(timeout = 300000) - public void testTraverseWhenParentDeleted() throws Exception { - StorageType[][] diskTypes = new StorageType[][] { - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}}; - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - createDirectoryTree(dfs); - - List<String> expectedTraverseOrder = getDFSListOfTree(); - - //Remove files which will not be traverse when parent is deleted - expectedTraverseOrder.remove("/root/D/L/R"); - expectedTraverseOrder.remove("/root/D/L/S"); - expectedTraverseOrder.remove("/root/D/L/Q/U"); - FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); - - //Queue limit can control the traverse logic to wait for some free - //entry in queue. After 10 files, traverse control will be on U. - StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.isRunning()).thenReturn(true); - BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); - INode rootINode = fsDir.getINode("/root"); - movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); - movmentNeededQueue.init(); - - //Wait for thread to reach U. - Thread.sleep(1000); - - dfs.delete(new Path("/root/D/L"), true); - - // Remove 10 element and make queue free, So other traversing will start. - for (int i = 0; i < 10; i++) { - String path = expectedTraverseOrder.remove(0); - long trackId = movmentNeededQueue.get().getTrackId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - //Wait to finish tree traverse - Thread.sleep(5000); - - // Check other element traversed in order and R,S should not be added in - // queue which we already removed from expected list - for (String path : expectedTraverseOrder) { - long trackId = movmentNeededQueue.get().getTrackId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - dfs.delete(new Path("/root"), true); - } - - /** - * Test traverse when root parent got deleted. - * 1. Delete L when traversing Q - * 2. E, M, U, R, S should not be in queued. - */ - @Test(timeout = 300000) - public void testTraverseWhenRootParentDeleted() throws Exception { - StorageType[][] diskTypes = new StorageType[][] { - {StorageType.DISK, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.SSD}, - {StorageType.DISK, StorageType.DISK}}; - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - createDirectoryTree(dfs); - - List<String> expectedTraverseOrder = getDFSListOfTree(); - - // Remove files which will not be traverse when parent is deleted - expectedTraverseOrder.remove("/root/D/L/R"); - expectedTraverseOrder.remove("/root/D/L/S"); - expectedTraverseOrder.remove("/root/D/L/Q/U"); - expectedTraverseOrder.remove("/root/D/M"); - expectedTraverseOrder.remove("/root/E"); - FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); - StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); - Mockito.when(sps.isRunning()).thenReturn(true); - // Queue limit can control the traverse logic to wait for some free - // entry in queue. After 10 files, traverse control will be on U. - BlockStorageMovementNeeded movmentNeededQueue = - new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); - movmentNeededQueue.init(); - INode rootINode = fsDir.getINode("/root"); - movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); - // Wait for thread to reach U. - Thread.sleep(1000); - - dfs.delete(new Path("/root/D/L"), true); - - // Remove 10 element and make queue free, So other traversing will start. - for (int i = 0; i < 10; i++) { - String path = expectedTraverseOrder.remove(0); - long trackId = movmentNeededQueue.get().getTrackId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - // Wait to finish tree traverse - Thread.sleep(5000); - - // Check other element traversed in order and E, M, U, R, S should not be - // added in queue which we already removed from expected list - for (String path : expectedTraverseOrder) { - long trackId = movmentNeededQueue.get().getTrackId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - dfs.delete(new Path("/root"), true); - } - - /** - * Test storage move blocks while under replication block tasks exists in the - * system. So, both will share the max transfer streams. - * - * 1. Create cluster with 3 datanode. - * 2. Create 20 files with 2 replica. - * 3. Start 2 more DNs with DISK & SSD types - * 4. SetReplication factor for the 1st 10 files to 4 to trigger replica task - * 5. Set policy to SSD to the 2nd set of files from 11-20 - * 6. Call SPS for 11-20 files to trigger move block tasks to new DNs - * 7. Wait for the under replica and SPS tasks completion - */ - @Test(timeout = 300000) - public void testMoveBlocksWithUnderReplicatedBlocks() throws Exception { - try { - config.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 3); - config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - config.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - "3000"); - config.setBoolean(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY, - false); - - StorageType[][] storagetypes = new StorageType[][] { - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}}; - hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) - .storageTypes(storagetypes).build(); - hdfsCluster.waitActive(); - dfs = hdfsCluster.getFileSystem(); - - // Below files will be used for pending replication block tasks. - for (int i=1; i<=20; i++){ - Path filePath = new Path("/file" + i); - DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE * 5, (short) 2, - 0); - } - - StorageType[][] newtypes = - new StorageType[][]{{StorageType.DISK, StorageType.SSD}, - {StorageType.DISK, StorageType.SSD}}; - startAdditionalDNs(config, 2, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - - // increase replication factor to 4 for the first 10 files and thus - // initiate replica tasks - for (int i=1; i<=10; i++){ - Path filePath = new Path("/file" + i); - dfs.setReplication(filePath, (short) 4); - } - - // invoke SPS for 11-20 files - for (int i = 11; i <= 20; i++) { - Path filePath = new Path("/file" + i); - dfs.setStoragePolicy(filePath, "ALL_SSD"); - dfs.satisfyStoragePolicy(filePath); - } - - for (int i = 1; i <= 10; i++) { - Path filePath = new Path("/file" + i); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.DISK, 4, 30000, hdfsCluster.getFileSystem()); - } - for (int i = 11; i <= 20; i++) { - Path filePath = new Path("/file" + i); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.SSD, 2, 30000, hdfsCluster.getFileSystem()); - } - } finally { - shutdownCluster(); - } - } - - @Test(timeout = 300000) - public void testStoragePolicySatisfyPathStatus() throws Exception { - try { - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - config.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - "3000"); - config.setBoolean(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_KEY, - false); - - StorageType[][] storagetypes = new StorageType[][] { - {StorageType.ARCHIVE, StorageType.DISK}, - {StorageType.ARCHIVE, StorageType.DISK}}; - hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) - .storageTypes(storagetypes).build(); - hdfsCluster.waitActive(); - BlockStorageMovementNeeded.setStatusClearanceElapsedTimeMs(20000); - dfs = hdfsCluster.getFileSystem(); - Path filePath = new Path("/file"); - DFSTestUtil.createFile(dfs, filePath, 1024, (short) 2, - 0); - dfs.setStoragePolicy(filePath, "COLD"); - dfs.satisfyStoragePolicy(filePath); - StoragePolicySatisfyPathStatus status = dfs.getClient() - .checkStoragePolicySatisfyPathStatus(filePath.toString()); - Assert.assertTrue("Status should be IN_PROGRESS", - StoragePolicySatisfyPathStatus.IN_PROGRESS.equals(status)); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.ARCHIVE, 2, 30000, dfs); - - // wait till status is SUCCESS - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - StoragePolicySatisfyPathStatus status = dfs.getClient() - .checkStoragePolicySatisfyPathStatus(filePath.toString()); - return StoragePolicySatisfyPathStatus.SUCCESS.equals(status); - } catch (IOException e) { - Assert.fail("Fail to get path status for sps"); - } - return false; - } - }, 100, 60000); - - // wait till status is NOT_AVAILABLE - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - StoragePolicySatisfyPathStatus status = dfs.getClient() - .checkStoragePolicySatisfyPathStatus(filePath.toString()); - return StoragePolicySatisfyPathStatus.NOT_AVAILABLE.equals(status); - } catch (IOException e) { - Assert.fail("Fail to get path status for sps"); - } - return false; - } - }, 100, 60000); - } finally { - shutdownCluster(); - } - } - - @Test(timeout = 300000) - public void testMaxRetryForFailedBlock() throws Exception { - try { - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - config.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - "1000"); - config.set(DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, - "1000"); - StorageType[][] storagetypes = new StorageType[][] { - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}}; - hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(2) - .storageTypes(storagetypes).build(); - hdfsCluster.waitActive(); - dfs = hdfsCluster.getFileSystem(); - - Path filePath = new Path("/retryFile"); - DFSTestUtil.createFile(dfs, filePath, DEFAULT_BLOCK_SIZE, (short) 2, - 0); - - dfs.setStoragePolicy(filePath, "COLD"); - dfs.satisfyStoragePolicy(filePath); - Thread.sleep(3000 - * DFSConfigKeys - .DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); - DFSTestUtil.waitExpectedStorageType(filePath.toString(), - StorageType.DISK, 2, 60000, hdfsCluster.getFileSystem()); - // Path status should be FAILURE - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - try { - StoragePolicySatisfyPathStatus status = dfs.getClient() - .checkStoragePolicySatisfyPathStatus(filePath.toString()); - return StoragePolicySatisfyPathStatus.FAILURE.equals(status); - } catch (IOException e) { - Assert.fail("Fail to get path status for sps"); - } - return false; - } - }, 100, 90000); - } finally { - shutdownCluster(); - } - } - - private static void createDirectoryTree(DistributedFileSystem dfs) - throws Exception { - // tree structure - /* - * root - * | - * A--------B--------C--------D--------E - * | | - * F----G----H----I J----K----L----M - * | | - * N----O----P Q----R----S - * | | - * T U - */ - // create root Node and child - dfs.mkdirs(new Path("/root")); - DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0); - dfs.mkdirs(new Path("/root/B")); - DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0); - dfs.mkdirs(new Path("/root/D")); - DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0); - - // Create /root/B child - DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0); - dfs.mkdirs(new Path("/root/B/G")); - DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0); - DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0); - - // Create /root/D child - DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0); - DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0); - dfs.mkdirs(new Path("/root/D/L")); - DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0); - - // Create /root/B/G child - DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0); - DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0); - dfs.mkdirs(new Path("/root/B/G/P")); - - // Create /root/D/L child - dfs.mkdirs(new Path("/root/D/L/Q")); - DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0); - DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0); - - // Create /root/B/G/P child - DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0); - - // Create /root/D/L/Q child - DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0); - } - - private List<String> getDFSListOfTree() { - List<String> dfsList = new ArrayList<>(); - dfsList.add("/root/A"); - dfsList.add("/root/B/F"); - dfsList.add("/root/B/G/N"); - dfsList.add("/root/B/G/O"); - dfsList.add("/root/B/G/P/T"); - dfsList.add("/root/B/H"); - dfsList.add("/root/B/I"); - dfsList.add("/root/C"); - dfsList.add("/root/D/J"); - dfsList.add("/root/D/K"); - dfsList.add("/root/D/L/Q/U"); - dfsList.add("/root/D/L/R"); - dfsList.add("/root/D/L/S"); - dfsList.add("/root/D/M"); - dfsList.add("/root/E"); - return dfsList; - } - - private String createFileAndSimulateFavoredNodes(int favoredNodesCount) - throws IOException { - ArrayList<DataNode> dns = hdfsCluster.getDataNodes(); - final String file1 = "/testMoveWithBlockPinning"; - // replication factor 3 - InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount]; - for (int i = 0; i < favoredNodesCount; i++) { - favoredNodes[i] = dns.get(i).getXferAddress(); - } - DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100, - DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes); - - LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0); - Assert.assertEquals("Wrong block count", 1, - locatedBlocks.locatedBlockCount()); - - // verify storage type before movement - LocatedBlock lb = locatedBlocks.get(0); - StorageType[] storageTypes = lb.getStorageTypes(); - for (StorageType storageType : storageTypes) { - Assert.assertTrue(StorageType.DISK == storageType); - } - - // Mock FsDatasetSpi#getPinning to show that the block is pinned. - DatanodeInfo[] locations = lb.getLocations(); - Assert.assertEquals(3, locations.length); - Assert.assertTrue(favoredNodesCount < locations.length); - for(DatanodeInfo dnInfo: locations){ - LOG.info("Simulate block pinning in datanode {}", - locations[favoredNodesCount]); - DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort()); - InternalDataNodeTestUtils.mockDatanodeBlkPinning(dn, true); - favoredNodesCount--; - if (favoredNodesCount <= 0) { - break; // marked favoredNodesCount number of pinned block location - } - } - return file1; - } - - private void waitForAttemptedItems(long expectedBlkMovAttemptedCount, - int timeout) throws TimeoutException, InterruptedException { - BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", - expectedBlkMovAttemptedCount, - sps.getAttemptedItemsMonitor().getAttemptedItemsCount()); - return sps.getAttemptedItemsMonitor() - .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; - } - }, 100, timeout); - } - - private void waitForBlocksMovementAttemptReport( - long expectedMovementFinishedBlocksCount, int timeout) - throws TimeoutException, InterruptedException { - BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager(); - final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); - GenericTestUtils.waitFor(new Supplier<Boolean>() { - @Override - public Boolean get() { - LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}", - expectedMovementFinishedBlocksCount, - sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()); - return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount() - >= expectedMovementFinishedBlocksCount; - } - }, 100, timeout); - } - - private void writeContent(final String fileName) throws IOException { - writeContent(fileName, (short) 3); - } - - private void writeContent(final String fileName, short replicatonFactor) - throws IOException { - // write to DISK - final FSDataOutputStream out = dfs.create(new Path(fileName), - replicatonFactor); - for (int i = 0; i < 1024; i++) { - out.write(i); - } - out.close(); - } - - private void startAdditionalDNs(final Configuration conf, - int newNodesRequired, int existingNodesNum, StorageType[][] newTypes, - int storagesPerDn, long nodeCapacity, final MiniDFSCluster cluster) - throws IOException { - long[][] capacities; - existingNodesNum += newNodesRequired; - capacities = new long[newNodesRequired][storagesPerDn]; - for (int i = 0; i < newNodesRequired; i++) { - for (int j = 0; j < storagesPerDn; j++) { - capacities[i][j] = nodeCapacity; - } - } - - cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null, - null, capacities, null, false, false, false, null); - cluster.triggerHeartbeats(); - } - - private MiniDFSCluster startCluster(final Configuration conf, - StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, - long nodeCapacity) throws IOException { - long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; - for (int i = 0; i < numberOfDatanodes; i++) { - for (int j = 0; j < storagesPerDn; j++) { - capacities[i][j] = nodeCapacity; - } - } - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) - .storageTypes(storageTypes).storageCapacities(capacities).build(); - cluster.waitActive(); - return cluster; - } -}
--------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org