HDFS-11965: [SPS]: Should give chance to satisfy the low redundant blocks before removing the xattr. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/863f4f9f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/863f4f9f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/863f4f9f Branch: refs/heads/HDFS-10285 Commit: 863f4f9f6f7784c55db75aec207ee7120b27d15a Parents: b8324ab Author: Uma Maheswara Rao G <uma.ganguma...@intel.com> Authored: Mon Jul 10 18:00:58 2017 -0700 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Mon Jan 29 09:20:28 2018 +0530 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 15 +++ .../server/namenode/StoragePolicySatisfier.java | 20 +++- .../namenode/TestStoragePolicySatisfier.java | 102 ++++++++++++++++++- ...stStoragePolicySatisfierWithStripedFile.java | 90 ++++++++++++++++ 4 files changed, 224 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/863f4f9f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index dd491cd..6dd743a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -4325,6 +4325,21 @@ public class BlockManager implements BlockStatsMXBean { } /** + * Check file has low redundancy blocks. + */ + public boolean hasLowRedundancyBlocks(BlockCollection bc) { + boolean result = false; + for (BlockInfo block : bc.getBlocks()) { + short expected = getExpectedRedundancyNum(block); + final NumberReplicas n = countNodes(block); + if (expected > n.liveReplicas()) { + result = true; + } + } + return result; + } + + /** * Check sufficient redundancy of the blocks in the collection. If any block * is needed reconstruction, insert it into the reconstruction queue. * Otherwise, if the block is more than the expected replication factor, http://git-wip-us.apache.org/repos/asf/hadoop/blob/863f4f9f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 1b2afa3..97cbf1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -99,7 +99,10 @@ public class StoragePolicySatisfier implements Runnable { // Represents that, the analysis skipped due to some conditions. // Example conditions are if no blocks really exists in block collection or // if analysis is not required on ec files with unsuitable storage policies - BLOCKS_TARGET_PAIRING_SKIPPED; + BLOCKS_TARGET_PAIRING_SKIPPED, + // Represents that, All the reported blocks are satisfied the policy but + // some of the blocks are low redundant. + FEW_LOW_REDUNDANCY_BLOCKS } public StoragePolicySatisfier(final Namesystem namesystem, @@ -247,6 +250,14 @@ public class StoragePolicySatisfier implements Runnable { case FEW_BLOCKS_TARGETS_PAIRED: this.storageMovementsMonitor.add(blockCollectionID, false); break; + case FEW_LOW_REDUNDANCY_BLOCKS: + if (LOG.isDebugEnabled()) { + LOG.debug("Adding trackID " + blockCollectionID + + " back to retry queue as some of the blocks" + + " are low redundant."); + } + this.storageMovementNeeded.add(blockCollectionID); + break; // Just clean Xattrs case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_ALREADY_SATISFIED: @@ -347,11 +358,16 @@ public class StoragePolicySatisfier implements Runnable { boolean computeStatus = computeBlockMovingInfos(blockMovingInfos, blockInfo, expectedStorageTypes, existing, storages); if (computeStatus - && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED) { + && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED + && !blockManager.hasLowRedundancyBlocks(blockCollection)) { status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED; } else { status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED; } + } else { + if (blockManager.hasLowRedundancyBlocks(blockCollection)) { + status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/863f4f9f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index f1a4169..7127895 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import java.io.FileNotFoundException; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.FSDataOutputStream; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsAdmin; @@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -582,7 +586,9 @@ public class TestStoragePolicySatisfier { Assert.assertTrue("SPS should be running as " + "no Mover really running", running); } finally { - hdfsCluster.shutdown(); + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } } } @@ -983,6 +989,100 @@ public class TestStoragePolicySatisfier { } } + /** + * Test SPS for low redundant file blocks. + * 1. Create cluster with 3 datanode. + * 1. Create one file with 3 replica. + * 2. Set policy and call satisfyStoragePolicy for file. + * 3. Stop NameNode and Datanodes. + * 4. Start NameNode with 2 datanode and wait for block movement. + * 5. Start third datanode. + * 6. Third Datanode replica also should be moved in proper + * sorage based on policy. + */ + @Test(timeout = 300000) + public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + "3000"); + StorageType[][] newtypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}}; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .storageTypes(newtypes).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/zeroSizeFile"); + DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0); + fs.setStoragePolicy(filePath, "COLD"); + List<DataNodeProperties> list = new ArrayList<>(); + list.add(cluster.stopDataNode(0)); + list.add(cluster.stopDataNode(0)); + list.add(cluster.stopDataNode(0)); + cluster.restartNameNodes(); + cluster.restartDataNode(list.get(0), true); + cluster.restartDataNode(list.get(1), true); + cluster.waitActive(); + fs.satisfyStoragePolicy(filePath); + Thread.sleep(3000 * 6); + cluster.restartDataNode(list.get(2), true); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for extra redundant file blocks. + * 1. Create cluster with 5 datanode. + * 2. Create one file with 5 replica. + * 3. Set file replication to 3. + * 4. Set policy and call satisfyStoragePolicy for file. + * 5. Block should be moved successfully. + */ + @Test(timeout = 300000) + public void testSPSWhenFileHasExcessRedundancyBlocks() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + "3000"); + StorageType[][] newtypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.DISK}}; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5) + .storageTypes(newtypes).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/zeroSizeFile"); + DFSTestUtil.createFile(fs, filePath, 1024, (short) 5, 0); + fs.setReplication(filePath, (short) 3); + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(BlockStorageMovementAttemptedItems.class)); + fs.setStoragePolicy(filePath, "COLD"); + fs.satisfyStoragePolicy(filePath); + DFSTestUtil.waitExpectedStorageType(filePath.toString(), + StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem()); + assertFalse("Log output does not contain expected log message: ", + logs.getOutput().contains("some of the blocks are low redundant")); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList<DataNode> dns = hdfsCluster.getDataNodes(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/863f4f9f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java index eb4a6a3..195c9e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -27,8 +29,10 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.HdfsAdmin; @@ -293,6 +297,92 @@ public class TestStoragePolicySatisfierWithStripedFile { } /** + * Test SPS for low redundant file blocks. + * 1. Create cluster with 10 datanode. + * 1. Create one striped file with default EC Policy. + * 2. Set policy and call satisfyStoragePolicy for file. + * 3. Stop NameNode and Datanodes. + * 4. Start NameNode with 5 datanode and wait for block movement. + * 5. Start remaining 5 datanode. + * 6. All replica should be moved in proper storage based on policy. + */ + @Test(timeout = 300000) + public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception { + // start 10 datanodes + int numOfDatanodes = 10; + int storagesPerDatanode = 2; + long capacity = 20 * defaultStripeBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys + .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + "3000"); + initConfWithStripe(conf, defaultStripeBlockSize); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}) + .storageCapacities(capacities) + .build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path barDir = new Path("/bar"); + fs.mkdirs(barDir); + // set an EC policy on "/bar" directory + fs.setErasureCodingPolicy(barDir, null); + + // write file to barDir + final Path fooFile = new Path("/bar/foo"); + long fileLen = cellSize * dataBlocks; + DFSTestUtil.createFile(cluster.getFileSystem(), fooFile, + fileLen, (short) 3, 0); + + // Move file to ARCHIVE. + fs.setStoragePolicy(barDir, "COLD"); + //Stop DataNodes and restart namenode + List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes); + for (int i = 0; i < numOfDatanodes; i++) { + list.add(cluster.stopDataNode(0)); + } + cluster.restartNameNodes(); + // Restart half datanodes + for (int i = 0; i < numOfDatanodes / 2; i++) { + cluster.restartDataNode(list.get(i), true); + } + cluster.waitActive(); + fs.satisfyStoragePolicy(fooFile); + Thread.sleep(3000 * 6); + //Start reaming datanodes + for (int i = numOfDatanodes - 1; i > numOfDatanodes / 2; i--) { + cluster.restartDataNode(list.get(i), true); + } + // verify storage types and locations. + waitExpectedStorageType(cluster, fooFile.toString(), fileLen, + StorageType.ARCHIVE, 9, 9, 60000); + } finally { + cluster.shutdown(); + } + } + + + /** * Tests to verify that for the given path, no blocks under the given path * will be scheduled for block movement as there are no available datanode * with required storage type. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org