HDFS-12893. [READ] Support replication of Provided blocks with non-default topologies.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c89b29bd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c89b29bd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c89b29bd Branch: refs/heads/YARN-6592 Commit: c89b29bd421152f0e7e16936f18d9e852895c37a Parents: 0f6aa95 Author: Virajith Jalaparti <viraj...@apache.org> Authored: Fri Dec 8 14:52:48 2017 -0800 Committer: Chris Douglas <cdoug...@apache.org> Committed: Fri Dec 15 17:51:41 2017 -0800 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 30 +++++++++++- .../blockmanagement/DatanodeStorageInfo.java | 11 +++-- .../blockmanagement/ProvidedStorageMap.java | 18 ++++++- .../TestNameNodeProvidedImplementation.java | 49 ++++++++++++++++++-- 4 files changed, 97 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 916cbaa..c1cd4db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2151,6 +2151,22 @@ public class BlockManager implements BlockStatsMXBean { } /** + * Get the associated {@link DatanodeDescriptor} for the storage. + * If the storage is of type PROVIDED, one of the nodes that reported + * PROVIDED storage are returned. If not, this is equivalent to + * {@code storage.getDatanodeDescriptor()}. + * @param storage + * @return the associated {@link DatanodeDescriptor}. + */ + private DatanodeDescriptor getDatanodeDescriptorFromStorage( + DatanodeStorageInfo storage) { + if (storage.getStorageType() == StorageType.PROVIDED) { + return providedStorageMap.chooseProvidedDatanode(); + } + return storage.getDatanodeDescriptor(); + } + + /** * Parse the data-nodes the block belongs to and choose a certain number * from them to be the recovery sources. * @@ -2198,10 +2214,14 @@ public class BlockManager implements BlockStatsMXBean { BitSet bitSet = isStriped ? new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null; for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { - final DatanodeDescriptor node = storage.getDatanodeDescriptor(); + final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage); final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, storage, corruptReplicas.getNodes(block), false); if (state == StoredReplicaState.LIVE) { + if (storage.getStorageType() == StorageType.PROVIDED) { + storage = new DatanodeStorageInfo(node, storage.getStorageID(), + storage.getStorageType(), storage.getState()); + } nodesContainingLiveReplicas.add(storage); } containingNodes.add(node); @@ -4338,7 +4358,13 @@ public class BlockManager implements BlockStatsMXBean { Collection<DatanodeDescriptor> corruptNodes = corruptReplicas .getNodes(storedBlock); for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { - final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); + if (storage.getStorageType() == StorageType.PROVIDED + && storage.getState() == State.NORMAL) { + // assume the policy is satisfied for blocks on PROVIDED storage + // as long as the storage is in normal state. + return true; + } + final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage); // Nodes under maintenance should be counted as valid replicas from // rack policy point of view. if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 76bf915..3a56ef1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -120,10 +120,15 @@ public class DatanodeStorageInfo { private boolean blockContentsStale = true; DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { + this(dn, s.getStorageID(), s.getStorageType(), s.getState()); + } + + DatanodeStorageInfo(DatanodeDescriptor dn, String storageID, + StorageType storageType, State state) { this.dn = dn; - this.storageID = s.getStorageID(); - this.storageType = s.getStorageType(); - this.state = s.getState(); + this.storageID = storageID; + this.storageType = storageType; + this.state = state; } public int getBlockReportCount() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 208ed3e..08d1434 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -192,7 +192,7 @@ public class ProvidedStorageMap { } public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) { - if (providedEnabled && storageId.equals(storage.getStorageID())) { + if (isProvidedStorage(storage.getStorageID())) { if (StorageType.PROVIDED.equals(storage.getStorageType())) { node.injectStorage(providedStorageInfo); return; @@ -204,6 +204,22 @@ public class ProvidedStorageMap { node.updateStorage(storage); } + private boolean isProvidedStorage(String dnStorageId) { + return providedEnabled && storageId.equals(dnStorageId); + } + + /** + * Choose a datanode that reported a volume of {@link StorageType} PROVIDED. + * + * @return the {@link DatanodeDescriptor} corresponding to a datanode that + * reported a volume with {@link StorageType} PROVIDED. If multiple + * datanodes report a PROVIDED volume, one is chosen uniformly at + * random. + */ + public DatanodeDescriptor chooseProvidedDatanode() { + return providedDescriptor.chooseRandom(); + } + /** * Builder used for creating {@link LocatedBlocks} when a block is provided. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c89b29bd/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java index 394e8d8..2917a34 100644 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAl import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -201,8 +202,15 @@ public class TestNameNodeProvidedImplementation { void startCluster(Path nspath, int numDatanodes, StorageType[] storageTypes, StorageType[][] storageTypesPerDatanode, - boolean doFormat) - throws IOException { + boolean doFormat) throws IOException { + startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode, + doFormat, null); + } + + void startCluster(Path nspath, int numDatanodes, + StorageType[] storageTypes, + StorageType[][] storageTypesPerDatanode, + boolean doFormat, String[] racks) throws IOException { conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); if (storageTypesPerDatanode != null) { @@ -211,6 +219,7 @@ public class TestNameNodeProvidedImplementation { .manageNameDfsDirs(doFormat) .numDataNodes(numDatanodes) .storageTypes(storageTypesPerDatanode) + .racks(racks) .build(); } else if (storageTypes != null) { cluster = new MiniDFSCluster.Builder(conf) @@ -219,12 +228,14 @@ public class TestNameNodeProvidedImplementation { .numDataNodes(numDatanodes) .storagesPerDatanode(storageTypes.length) .storageTypes(storageTypes) + .racks(racks) .build(); } else { cluster = new MiniDFSCluster.Builder(conf) .format(doFormat) .manageNameDfsDirs(doFormat) .numDataNodes(numDatanodes) + .racks(racks) .build(); } cluster.waitActive(); @@ -515,11 +526,12 @@ public class TestNameNodeProvidedImplementation { StorageType.PROVIDED, StorageType.DISK}, null, false); + setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix); + } - String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix; + private void setAndUnsetReplication(String filename) throws Exception { Path file = new Path(filename); FileSystem fs = cluster.getFileSystem(); - // set the replication to 4, and test that the file has // the required replication. short newReplication = 4; @@ -833,7 +845,7 @@ public class TestNameNodeProvidedImplementation { new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null, false); - int fileIndex = numFiles -1; + int fileIndex = numFiles - 1; final BlockManager blockManager = cluster.getNamesystem().getBlockManager(); final DatanodeManager dnm = blockManager.getDatanodeManager(); @@ -890,4 +902,31 @@ public class TestNameNodeProvidedImplementation { // reports all 3 replicas verifyFileLocation(fileIndex, 3); } + + @Test + public void testProvidedWithHierarchicalTopology() throws Exception { + conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class, + UGIResolver.class); + String packageName = "org.apache.hadoop.hdfs.server.blockmanagement"; + String[] policies = new String[] { + "BlockPlacementPolicyDefault", + "BlockPlacementPolicyRackFaultTolerant", + "BlockPlacementPolicyWithNodeGroup", + "BlockPlacementPolicyWithUpgradeDomain"}; + createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, + FixedBlockResolver.class); + String[] racks = + {"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1", + "/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" }; + for (String policy: policies) { + LOG.info("Using policy: " + packageName + "." + policy); + conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy); + startCluster(NNDIRPATH, racks.length, + new StorageType[]{StorageType.PROVIDED, StorageType.DISK}, + null, false, racks); + verifyFileSystemContents(); + setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix); + cluster.shutdown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org