Repository: hadoop Updated Branches: refs/heads/branch-2 89fc7fe67 -> b3ea11dfd
HDFS-11482. Add storage type demand to into DFSNetworkTopology#chooseRandom. Contributed by Chen Liang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3ea11df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3ea11df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3ea11df Branch: refs/heads/branch-2 Commit: b3ea11dfdb46fcec86118a132bee9a9978df21dd Parents: 89fc7fe Author: Arpit Agarwal <a...@apache.org> Authored: Mon Aug 21 14:07:59 2017 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Mon Aug 21 14:07:59 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/net/InnerNodeImpl.java | 8 +- .../net/NetworkTopologyWithNodeGroup.java | 2 +- .../hadoop/hdfs/net/DFSNetworkTopology.java | 289 ++++++++++++ .../hadoop/hdfs/net/DFSTopologyNodeImpl.java | 277 ++++++++++++ .../blockmanagement/DatanodeDescriptor.java | 10 + .../apache/hadoop/hdfs/DFSNetworkTopology.java | 36 -- .../apache/hadoop/hdfs/DFSTopologyNodeImpl.java | 255 ----------- .../hadoop/hdfs/TestDFSNetworkTopology.java | 260 ----------- .../hadoop/hdfs/net/TestDFSNetworkTopology.java | 449 +++++++++++++++++++ 9 files changed, 1030 insertions(+), 556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java index 81eaf7f..5a2931b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java @@ -63,7 +63,7 @@ public class InnerNodeImpl extends NodeBase implements InnerNode { /** Judge if this node represents a rack * @return true if it has no child or its children are not InnerNodes */ - boolean isRack() { + public boolean isRack() { if (children.isEmpty()) { return true; } @@ -81,7 +81,7 @@ public class InnerNodeImpl extends NodeBase implements InnerNode { * @param n a node * @return true if this node is an ancestor of <i>n</i> */ - protected boolean isAncestor(Node n) { + public boolean isAncestor(Node n) { return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) || (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR). startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR); @@ -92,12 +92,12 @@ public class InnerNodeImpl extends NodeBase implements InnerNode { * @param n a node * @return true if this node is the parent of <i>n</i> */ - protected boolean isParent(Node n) { + public boolean isParent(Node n) { return n.getNetworkLocation().equals(getPath(this)); } /* Return a child name of this node who is an ancestor of node <i>n</i> */ - protected String getNextAncestorName(Node n) { + public String getNextAncestorName(Node n) { if (!isAncestor(n)) { throw new IllegalArgumentException( this + "is not an ancestor of " + n); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index a20d5fc..bec0fe1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -308,7 +308,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { } @Override - boolean isRack() { + public boolean isRack() { // it is node group if (getChildren().isEmpty()) { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java new file mode 100644 index 0000000..ee83dba --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; + +/** + * The HDFS specific network topology class. The main purpose of doing this + * subclassing is to add storage-type-aware chooseRandom method. All the + * remaining parts should be the same. + * + * Currently a placeholder to test storage type info. + * TODO : add "chooseRandom with storageType info" function. + */ +public class DFSNetworkTopology extends NetworkTopology { + + private static final Random RANDOM = new Random(); + + public static DFSNetworkTopology getInstance(Configuration conf) { + DFSNetworkTopology nt = new DFSNetworkTopology(); + return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY); + } + + /** + * Randomly choose one node from <i>scope</i>, with specified storage type. + * + * If scope starts with ~, choose one from the all nodes except for the + * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>. + * If excludedNodes is given, choose a node that's not in excludedNodes. + * + * @param scope range of nodes from which a node will be chosen + * @param excludedNodes nodes to be excluded from + * @return the chosen node + */ + public Node chooseRandomWithStorageType(final String scope, + final Collection<Node> excludedNodes, StorageType type) { + netlock.readLock().lock(); + try { + if (scope.startsWith("~")) { + return chooseRandomWithStorageType( + NodeBase.ROOT, scope.substring(1), excludedNodes, type); + } else { + return chooseRandomWithStorageType( + scope, null, excludedNodes, type); + } + } finally { + netlock.readLock().unlock(); + } + } + + /** + * Choose a random node based on given scope, excludedScope and excludedNodes + * set. Although in general the topology has at most three layers, this class + * will not impose such assumption. + * + * At high level, the idea is like this, say: + * + * R has two children A and B, and storage type is X, say: + * A has X = 6 (rooted at A there are 6 datanodes with X) and B has X = 8. + * + * Then R will generate a random int between 1~14, if it's <= 6, recursively + * call into A, otherwise B. This will maintain a uniformed randomness of + * choosing datanodes. + * + * The tricky part is how to handle excludes. + * + * For excludedNodes, since this set is small: currently the main reason of + * being an excluded node is because it already has a replica. So randomly + * picking up this node again should be rare. Thus we only check that, if the + * chosen node is excluded, we do chooseRandom again. + * + * For excludedScope, we locate the root of the excluded scope. Subtracting + * all it's ancestors' storage counters accordingly, this way the excluded + * root is out of the picture. + * + * TODO : this function has duplicate code as NetworkTopology, need to + * refactor in the future. + * + * @param scope + * @param excludedScope + * @param excludedNodes + * @return + */ + @VisibleForTesting + Node chooseRandomWithStorageType(final String scope, + String excludedScope, final Collection<Node> excludedNodes, + StorageType type) { + if (excludedScope != null) { + if (scope.startsWith(excludedScope)) { + return null; + } + if (!excludedScope.startsWith(scope)) { + excludedScope = null; + } + } + Node node = getNode(scope); + if (node == null) { + LOG.debug("Invalid scope {}, non-existing node", scope); + return null; + } + if (!(node instanceof DFSTopologyNodeImpl)) { + // a node is either DFSTopologyNodeImpl, or a DatanodeDescriptor + return ((DatanodeDescriptor)node).hasStorageType(type) ? node : null; + } + DFSTopologyNodeImpl root = (DFSTopologyNodeImpl)node; + Node excludeRoot = excludedScope == null ? null : getNode(excludedScope); + + // check to see if there are nodes satisfying the condition at all + int availableCount = root.getSubtreeStorageCount(type); + if (excludeRoot != null && root.isAncestor(excludeRoot)) { + if (excludeRoot instanceof DFSTopologyNodeImpl) { + availableCount -= ((DFSTopologyNodeImpl)excludeRoot) + .getSubtreeStorageCount(type); + } else { + availableCount -= ((DatanodeDescriptor)excludeRoot) + .hasStorageType(type) ? 1 : 0; + } + } + if (excludedNodes != null) { + for (Node excludedNode : excludedNodes) { + // all excluded nodes should be DatanodeDescriptor + Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor); + availableCount -= ((DatanodeDescriptor) excludedNode) + .hasStorageType(type) ? 1 : 0; + } + } + if (availableCount <= 0) { + // should never be <0 in general, adding <0 check for safety purpose + return null; + } + // to this point, it is guaranteed that there is at least one node + // that satisfies the requirement, keep trying until we found one. + Node chosen; + do { + chosen = chooseRandomWithStorageTypeAndExcludeRoot(root, excludeRoot, + type); + if (excludedNodes == null || !excludedNodes.contains(chosen)) { + break; + } else { + LOG.debug("Node {} is excluded, continuing.", chosen); + } + } while (true); + LOG.debug("chooseRandom returning {}", chosen); + return chosen; + } + + /** + * Choose a random node that has the required storage type, under the given + * root, with an excluded subtree root (could also just be a leaf node). + * + * Note that excludedNode is checked after a random node, so it is not being + * handled here. + * + * @param root the root node where we start searching for a datanode + * @param excludeRoot the root of the subtree what should be excluded + * @param type the expected storage type + * @return a random datanode, with the storage type, and is not in excluded + * scope + */ + private Node chooseRandomWithStorageTypeAndExcludeRoot( + DFSTopologyNodeImpl root, Node excludeRoot, StorageType type) { + Node chosenNode; + if (root.isRack()) { + // children are datanode descriptor + ArrayList<Node> candidates = new ArrayList<>(); + for (Node node : root.getChildren()) { + if (node.equals(excludeRoot)) { + continue; + } + DatanodeDescriptor dnDescriptor = (DatanodeDescriptor)node; + if (dnDescriptor.hasStorageType(type)) { + candidates.add(node); + } + } + if (candidates.size() == 0) { + return null; + } + // to this point, all nodes in candidates are valid choices, and they are + // all datanodes, pick a random one. + chosenNode = candidates.get(RANDOM.nextInt(candidates.size())); + } else { + // the children are inner nodes + ArrayList<DFSTopologyNodeImpl> candidates = + getEligibleChildren(root, excludeRoot, type); + if (candidates.size() == 0) { + return null; + } + // again, all children are also inner nodes, we can do this cast. + // to maintain uniformality, the search needs to be based on the counts + // of valid datanodes. Below is a random weighted choose. + int totalCounts = 0; + int[] countArray = new int[candidates.size()]; + for (int i = 0; i < candidates.size(); i++) { + DFSTopologyNodeImpl innerNode = candidates.get(i); + int subTreeCount = innerNode.getSubtreeStorageCount(type); + totalCounts += subTreeCount; + countArray[i] = subTreeCount; + } + // generate a random val between [1, totalCounts] + int randomCounts = RANDOM.nextInt(totalCounts) + 1; + int idxChosen = 0; + // searching for the idxChosen can potentially be done with binary + // search, but does not seem to worth it here. + for (int i = 0; i < countArray.length; i++) { + if (randomCounts <= countArray[i]) { + idxChosen = i; + break; + } + randomCounts -= countArray[i]; + } + DFSTopologyNodeImpl nextRoot = candidates.get(idxChosen); + chosenNode = chooseRandomWithStorageTypeAndExcludeRoot( + nextRoot, excludeRoot, type); + } + return chosenNode; + } + + /** + * Given root, excluded root and storage type. Find all the children of the + * root, that has the storage type available. One check is that if the + * excluded root is under a children, this children must subtract the storage + * count of the excluded root. + * @param root the subtree root we check. + * @param excludeRoot the root of the subtree that should be excluded. + * @param type the storage type we look for. + * @return a list of possible nodes, each of them is eligible as the next + * level root we search. + */ + private ArrayList<DFSTopologyNodeImpl> getEligibleChildren( + DFSTopologyNodeImpl root, Node excludeRoot, StorageType type) { + ArrayList<DFSTopologyNodeImpl> candidates = new ArrayList<>(); + int excludeCount = 0; + if (excludeRoot != null && root.isAncestor(excludeRoot)) { + // the subtree to be excluded is under the given root, + // find out the number of nodes to be excluded. + if (excludeRoot instanceof DFSTopologyNodeImpl) { + // if excludedRoot is an inner node, get the counts of all nodes on + // this subtree of that storage type. + excludeCount = ((DFSTopologyNodeImpl) excludeRoot) + .getSubtreeStorageCount(type); + } else { + // if excludedRoot is a datanode, simply ignore this one node + if (((DatanodeDescriptor) excludeRoot).hasStorageType(type)) { + excludeCount = 1; + } + } + } + // have calculated the number of storage counts to be excluded. + // walk through all children to check eligibility. + for (Node node : root.getChildren()) { + DFSTopologyNodeImpl dfsNode = (DFSTopologyNodeImpl) node; + int storageCount = dfsNode.getSubtreeStorageCount(type); + if (excludeRoot != null && excludeCount != 0 && + (dfsNode.isAncestor(excludeRoot) || dfsNode.equals(excludeRoot))) { + storageCount -= excludeCount; + } + if (storageCount > 0) { + candidates.add(dfsNode); + } + } + return candidates; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java new file mode 100644 index 0000000..c00978d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.net.InnerNode; +import org.apache.hadoop.net.InnerNodeImpl; +import org.apache.hadoop.net.Node; + +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * The HDFS-specific representation of a network topology inner node. The + * difference is this class includes the information about the storage type + * info of this subtree. This info will be used when selecting subtrees + * in block placement. + */ +public class DFSTopologyNodeImpl extends InnerNodeImpl { + + static final InnerNodeImpl.Factory FACTORY + = new DFSTopologyNodeImpl.Factory(); + + static final class Factory extends InnerNodeImpl.Factory { + private Factory() {} + + @Override + public InnerNodeImpl newInnerNode(String path) { + return new DFSTopologyNodeImpl(path); + } + } + + /** + * The core data structure of this class. The information about what storage + * types this subtree has. Basically, a map whose key is a child + * id, value is a enum map including the counts of each storage type. e.g. + * DISK type has count 5 means there are 5 leaf datanodes with DISK type + * available. This value is set/updated upon datanode joining and leaving. + * + * NOTE : It might be sufficient to keep only a map from storage type + * to count, omitting the child node id. But this might make it hard to keep + * consistency when there are updates from children. + * + * For example, if currently R has two children A and B with storage X, Y, and + * A : X=1 Y=1 + * B : X=2 Y=2 + * so we store X=3 Y=3 as total on R. + * + * Now say A has a new X plugged in and becomes X=2 Y=1. + * + * If we know that "A adds one X", it is easy to update R by +1 on X. However, + * if we don't know "A adds one X", but instead got "A now has X=2 Y=1", + * (which seems to be the case in current heartbeat) we will not know how to + * update R. While if we store on R "A has X=1 and Y=1" then we can simply + * update R by completely replacing the A entry and all will be good. + */ + private final HashMap + <String, EnumMap<StorageType, Integer>> childrenStorageInfo; + + DFSTopologyNodeImpl(String path) { + super(path); + childrenStorageInfo = new HashMap<>(); + } + + DFSTopologyNodeImpl( + String name, String location, InnerNode parent, int level) { + super(name, location, parent, level); + childrenStorageInfo = new HashMap<>(); + } + + public int getSubtreeStorageCount(StorageType type) { + int res = 0; + for (Map.Entry<String, EnumMap<StorageType, Integer>> entry : + childrenStorageInfo.entrySet()) { + if (entry.getValue().containsKey(type)) { + res += entry.getValue().get(type); + } + } + return res; + } + + int getNumOfChildren() { + return children.size(); + } + + @Override + public boolean add(Node n) { + if (!isAncestor(n)) { + throw new IllegalArgumentException(n.getName() + + ", which is located at " + n.getNetworkLocation() + + ", is not a descendant of " + getPath(this)); + } + // In HDFS topology, the leaf node should always be DatanodeDescriptor + if (!(n instanceof DatanodeDescriptor)) { + throw new IllegalArgumentException("Unexpected node type " + + n.getClass().getName()); + } + DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n; + if (isParent(n)) { + // this node is the parent of n; add n directly + n.setParent(this); + n.setLevel(this.level + 1); + Node prev = childrenMap.put(n.getName(), n); + if (prev != null) { + for(int i=0; i<children.size(); i++) { + if (children.get(i).getName().equals(n.getName())) { + children.set(i, n); + return false; + } + } + } + children.add(n); + numOfLeaves++; + synchronized (childrenStorageInfo) { + if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) { + childrenStorageInfo.put( + dnDescriptor.getName(), + new EnumMap<StorageType, Integer>(StorageType.class)); + } + for (StorageType st : dnDescriptor.getStorageTypes()) { + childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1); + } + } + return true; + } else { + // find the next ancestor node + String parentName = getNextAncestorName(n); + InnerNode parentNode = (InnerNode)childrenMap.get(parentName); + if (parentNode == null) { + // create a new InnerNode + parentNode = createParentNode(parentName); + children.add(parentNode); + childrenMap.put(parentNode.getName(), parentNode); + } + // add n to the subtree of the next ancestor node + if (parentNode.add(n)) { + numOfLeaves++; + synchronized (childrenStorageInfo) { + if (!childrenStorageInfo.containsKey(parentNode.getName())) { + childrenStorageInfo.put( + parentNode.getName(), + new EnumMap<StorageType, Integer>(StorageType.class)); + for (StorageType st : dnDescriptor.getStorageTypes()) { + childrenStorageInfo.get(parentNode.getName()).put(st, 1); + } + } else { + EnumMap<StorageType, Integer> currentCount = + childrenStorageInfo.get(parentNode.getName()); + for (StorageType st : dnDescriptor.getStorageTypes()) { + if (currentCount.containsKey(st)) { + currentCount.put(st, currentCount.get(st) + 1); + } else { + currentCount.put(st, 1); + } + } + } + } + return true; + } else { + return false; + } + } + } + + @VisibleForTesting + HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() { + return childrenStorageInfo; + } + + + private DFSTopologyNodeImpl createParentNode(String parentName) { + return new DFSTopologyNodeImpl( + parentName, getPath(this), this, this.getLevel() + 1); + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean remove(Node n) { + if (!isAncestor(n)) { + throw new IllegalArgumentException(n.getName() + + ", which is located at " + n.getNetworkLocation() + + ", is not a descendant of " + getPath(this)); + } + // In HDFS topology, the leaf node should always be DatanodeDescriptor + if (!(n instanceof DatanodeDescriptor)) { + throw new IllegalArgumentException("Unexpected node type " + + n.getClass().getName()); + } + DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n; + if (isParent(n)) { + // this node is the parent of n; remove n directly + if (childrenMap.containsKey(n.getName())) { + for (int i=0; i<children.size(); i++) { + if (children.get(i).getName().equals(n.getName())) { + children.remove(i); + childrenMap.remove(n.getName()); + synchronized (childrenStorageInfo) { + childrenStorageInfo.remove(dnDescriptor.getName()); + } + numOfLeaves--; + n.setParent(null); + return true; + } + } + } + return false; + } else { + // find the next ancestor node: the parent node + String parentName = getNextAncestorName(n); + DFSTopologyNodeImpl parentNode = + (DFSTopologyNodeImpl)childrenMap.get(parentName); + if (parentNode == null) { + return false; + } + // remove n from the parent node + boolean isRemoved = parentNode.remove(n); + if (isRemoved) { + // if the parent node has no children, remove the parent node too + synchronized (childrenStorageInfo) { + EnumMap<StorageType, Integer> currentCount = + childrenStorageInfo.get(parentNode.getName()); + EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class); + for (StorageType st : dnDescriptor.getStorageTypes()) { + int newCount = currentCount.get(st) - 1; + if (newCount == 0) { + toRemove.add(st); + } + currentCount.put(st, newCount); + } + for (StorageType st : toRemove) { + currentCount.remove(st); + } + } + if (parentNode.getNumOfChildren() == 0) { + for(int i=0; i < children.size(); i++) { + if (children.get(i).getName().equals(parentName)) { + children.remove(i); + childrenMap.remove(parentName); + childrenStorageInfo.remove(parentNode.getName()); + break; + } + } + } + numOfLeaves--; + } + return isRemoved; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 3cf3ea0..77ab6b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -923,5 +923,15 @@ public class DatanodeDescriptor extends DatanodeInfo { public boolean isRegistered() { return isAlive() && !forceRegistration; } + + + public boolean hasStorageType(StorageType type) { + for (DatanodeStorageInfo dnStorage : getStorageInfos()) { + if (dnStorage.getStorageType() == type) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java deleted file mode 100644 index a6b8c00..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetworkTopology; - -/** - * The HDFS specific network topology class. The main purpose of doing this - * subclassing is to add storage-type-aware chooseRandom method. All the - * remaining parts should be the same. - * - * Currently a placeholder to test storage type info. - * TODO : add "chooseRandom with storageType info" function. - */ -public class DFSNetworkTopology extends NetworkTopology { - public static DFSNetworkTopology getInstance(Configuration conf) { - DFSNetworkTopology nt = new DFSNetworkTopology(); - return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java deleted file mode 100644 index aee9fa3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.net.InnerNode; -import org.apache.hadoop.net.InnerNodeImpl; -import org.apache.hadoop.net.Node; - -import java.util.EnumMap; -import java.util.EnumSet; -import java.util.HashMap; - -/** - * The HDFS-specific representation of a network topology inner node. The - * difference is this class includes the information about the storage type - * info of this subtree. This info will be used when selecting subtrees - * in block placement. - */ -public class DFSTopologyNodeImpl extends InnerNodeImpl { - - static final InnerNodeImpl.Factory FACTORY - = new DFSTopologyNodeImpl.Factory(); - - static final class Factory extends InnerNodeImpl.Factory { - private Factory() {} - - @Override - public InnerNodeImpl newInnerNode(String path) { - return new DFSTopologyNodeImpl(path); - } - } - - /** - * The core data structure of this class. The information about what storage - * types this subtree has. Basically, a map whose key is a child - * id, value is a enum map including the counts of each storage type. e.g. - * DISK type has count 5 means there are 5 leaf datanodes with DISK type - * available. This value is set/updated upon datanode joining and leaving. - * - * NOTE : It might be sufficient to keep only a map from storage type - * to count, omitting the child node id. But this might make it hard to keep - * consistency when there are updates from children. - * - * For example, if currently R has two children A and B with storage X, Y, and - * A : X=1 Y=1 - * B : X=2 Y=2 - * so we store X=3 Y=3 as total on R. - * - * Now say A has a new X plugged in and becomes X=2 Y=1. - * - * If we know that "A adds one X", it is easy to update R by +1 on X. However, - * if we don't know "A adds one X", but instead got "A now has X=2 Y=1", - * (which seems to be the case in current heartbeat) we will not know how to - * update R. While if we store on R "A has X=1 and Y=1" then we can simply - * update R by completely replacing the A entry and all will be good. - */ - private final HashMap - <String, EnumMap<StorageType, Integer>> childrenStorageInfo; - - DFSTopologyNodeImpl(String path) { - super(path); - childrenStorageInfo = new HashMap<>(); - } - - DFSTopologyNodeImpl( - String name, String location, InnerNode parent, int level) { - super(name, location, parent, level); - childrenStorageInfo = new HashMap<>(); - } - - int getNumOfChildren() { - return children.size(); - } - - @Override - public boolean add(Node n) { - if (!isAncestor(n)) { - throw new IllegalArgumentException(n.getName() - + ", which is located at " + n.getNetworkLocation() - + ", is not a descendant of " + getPath(this)); - } - // In HDFS topology, the leaf node should always be DatanodeDescriptor - if (!(n instanceof DatanodeDescriptor)) { - throw new IllegalArgumentException("Unexpected node type " - + n.getClass().getName()); - } - DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n; - if (isParent(n)) { - // this node is the parent of n; add n directly - n.setParent(this); - n.setLevel(this.level + 1); - Node prev = childrenMap.put(n.getName(), n); - if (prev != null) { - for(int i=0; i<children.size(); i++) { - if (children.get(i).getName().equals(n.getName())) { - children.set(i, n); - return false; - } - } - } - children.add(n); - numOfLeaves++; - synchronized (childrenStorageInfo) { - if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) { - childrenStorageInfo.put( - dnDescriptor.getName(), - new EnumMap<StorageType, Integer>(StorageType.class)); - } - for (StorageType st : dnDescriptor.getStorageTypes()) { - childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1); - } - } - return true; - } else { - // find the next ancestor node - String parentName = getNextAncestorName(n); - InnerNode parentNode = (InnerNode)childrenMap.get(parentName); - if (parentNode == null) { - // create a new InnerNode - parentNode = createParentNode(parentName); - children.add(parentNode); - childrenMap.put(parentNode.getName(), parentNode); - } - // add n to the subtree of the next ancestor node - if (parentNode.add(n)) { - numOfLeaves++; - synchronized (childrenStorageInfo) { - if (!childrenStorageInfo.containsKey(parentNode.getName())) { - childrenStorageInfo.put( - parentNode.getName(), - new EnumMap<StorageType, Integer>(StorageType.class)); - for (StorageType st : dnDescriptor.getStorageTypes()) { - childrenStorageInfo.get(parentNode.getName()).put(st, 1); - } - } else { - EnumMap<StorageType, Integer> currentCount = - childrenStorageInfo.get(parentNode.getName()); - for (StorageType st : dnDescriptor.getStorageTypes()) { - if (currentCount.containsKey(st)) { - currentCount.put(st, currentCount.get(st) + 1); - } else { - currentCount.put(st, 1); - } - } - } - } - return true; - } else { - return false; - } - } - } - - @VisibleForTesting - HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() { - return childrenStorageInfo; - } - - - private DFSTopologyNodeImpl createParentNode(String parentName) { - return new DFSTopologyNodeImpl( - parentName, getPath(this), this, this.getLevel() + 1); - } - - @Override - public boolean remove(Node n) { - if (!isAncestor(n)) { - throw new IllegalArgumentException(n.getName() - + ", which is located at " + n.getNetworkLocation() - + ", is not a descendant of " + getPath(this)); - } - // In HDFS topology, the leaf node should always be DatanodeDescriptor - if (!(n instanceof DatanodeDescriptor)) { - throw new IllegalArgumentException("Unexpected node type " - + n.getClass().getName()); - } - DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n; - if (isParent(n)) { - // this node is the parent of n; remove n directly - if (childrenMap.containsKey(n.getName())) { - for (int i=0; i<children.size(); i++) { - if (children.get(i).getName().equals(n.getName())) { - children.remove(i); - childrenMap.remove(n.getName()); - synchronized (childrenStorageInfo) { - childrenStorageInfo.remove(dnDescriptor.getName()); - } - numOfLeaves--; - n.setParent(null); - return true; - } - } - } - return false; - } else { - // find the next ancestor node: the parent node - String parentName = getNextAncestorName(n); - DFSTopologyNodeImpl parentNode = - (DFSTopologyNodeImpl)childrenMap.get(parentName); - if (parentNode == null) { - return false; - } - // remove n from the parent node - boolean isRemoved = parentNode.remove(n); - if (isRemoved) { - // if the parent node has no children, remove the parent node too - synchronized (childrenStorageInfo) { - EnumMap<StorageType, Integer> currentCount = - childrenStorageInfo.get(parentNode.getName()); - EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class); - for (StorageType st : dnDescriptor.getStorageTypes()) { - int newCount = currentCount.get(st) - 1; - if (newCount == 0) { - toRemove.add(st); - } - currentCount.put(st, newCount); - } - for (StorageType st : toRemove) { - currentCount.remove(st); - } - } - if (parentNode.getNumOfChildren() == 0) { - for(int i=0; i < children.size(); i++) { - if (children.get(i).getName().equals(parentName)) { - children.remove(i); - childrenMap.remove(parentName); - childrenStorageInfo.remove(parentNode.getName()); - break; - } - } - } - numOfLeaves--; - } - return isRemoved; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java deleted file mode 100644 index ac1edf9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -import java.util.EnumMap; -import java.util.HashMap; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * This class tests the correctness of storage type info stored in - * DFSNetworkTopology. - */ -public class TestDFSNetworkTopology { - private static final Log LOG = - LogFactory.getLog(TestDFSNetworkTopology.class); - private final static DFSNetworkTopology CLUSTER = - DFSNetworkTopology.getInstance(new Configuration()); - private DatanodeDescriptor[] dataNodes; - - @Rule - public Timeout testTimeout = new Timeout(30000); - - @Before - public void setupDatanodes() { - final String[] racks = { - "/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2", - - "/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3", - - "/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5", - - "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", - "/l2/d4/r1", "/l2/d4/r1"}; - final String[] hosts = { - "host1", "host2", "host3", "host4", "host5", - "host6", "host7", "host8", "host9", "host10", - "host11", "host12", "host13", "host14", "host15", - "host16", "host17", "host18", "host19", "host20"}; - final StorageType[] types = { - StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE, - StorageType.DISK, StorageType.DISK, - - StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD, - - StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK, - StorageType.ARCHIVE, StorageType.ARCHIVE, - - StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK, - StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE, - StorageType.SSD}; - final DatanodeStorageInfo[] storages = - DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); - for (int i = 0; i < dataNodes.length; i++) { - CLUSTER.add(dataNodes[i]); - } - dataNodes[9].setDecommissioned(); - dataNodes[10].setDecommissioned(); - } - - /** - * Test getting the storage type info of subtree. - * @throws Exception - */ - @Test - public void testGetStorageTypeInfo() throws Exception { - // checking level = 2 nodes - DFSTopologyNodeImpl d1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); - HashMap<String, EnumMap<StorageType, Integer>> d1info = - d1.getChildrenStorageInfo(); - assertEquals(2, d1info.keySet().size()); - assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2); - assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK)); - assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE)); - assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK)); - assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE)); - - DFSTopologyNodeImpl d2 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2"); - HashMap<String, EnumMap<StorageType, Integer>> d2info = - d2.getChildrenStorageInfo(); - assertEquals(1, d2info.keySet().size()); - assertTrue(d2info.get("r3").size() == 3); - assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK)); - assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK)); - assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD)); - - DFSTopologyNodeImpl d3 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3"); - HashMap<String, EnumMap<StorageType, Integer>> d3info = - d3.getChildrenStorageInfo(); - assertEquals(5, d3info.keySet().size()); - assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK)); - assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK)); - assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK)); - assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE)); - assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE)); - - DFSTopologyNodeImpl d4 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4"); - HashMap<String, EnumMap<StorageType, Integer>> d4info = - d4.getChildrenStorageInfo(); - assertEquals(1, d4info.keySet().size()); - assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK)); - assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK)); - assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE)); - assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD)); - - DFSTopologyNodeImpl l1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); - HashMap<String, EnumMap<StorageType, Integer>> l1info = - l1.getChildrenStorageInfo(); - assertEquals(2, l1info.keySet().size()); - assertTrue(l1info.get("d1").size() == 2 - && l1info.get("d2").size() == 3); - assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE)); - assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); - - // checking level = 1 nodes - DFSTopologyNodeImpl l2 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l2"); - HashMap<String, EnumMap<StorageType, Integer>> l2info = - l2.getChildrenStorageInfo(); - assertTrue(l2info.get("d3").size() == 3 - && l2info.get("d4").size() == 4); - assertEquals(2, l2info.keySet().size()); - assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK)); - assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE)); - assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK)); - assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK)); - assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE)); - assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK)); - assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD)); - } - - /** - * Test the correctness of storage type info when nodes are added and removed. - * @throws Exception - */ - @Test - public void testAddAndRemoveTopology() throws Exception { - String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"}; - String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"}; - String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32", - "33.33.33.33"}; - StorageType[] newTypes = {StorageType.DISK, StorageType.SSD, - StorageType.SSD, StorageType.SSD}; - DatanodeDescriptor[] newDD = new DatanodeDescriptor[4]; - - for (int i = 0; i<4; i++) { - DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo( - "s" + newHost[i], newips[i], newRack[i], newHost[i], - newTypes[i], null); - newDD[i] = dsi.getDatanodeDescriptor(); - CLUSTER.add(newDD[i]); - } - - DFSTopologyNodeImpl d1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); - HashMap<String, EnumMap<StorageType, Integer>> d1info = - d1.getChildrenStorageInfo(); - assertEquals(3, d1info.keySet().size()); - assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2 - && d1info.get("r3").size() == 1); - assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK)); - assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE)); - assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK)); - assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE)); - assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD)); - - DFSTopologyNodeImpl d3 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3"); - HashMap<String, EnumMap<StorageType, Integer>> d3info = - d3.getChildrenStorageInfo(); - assertEquals(1, d3info.keySet().size()); - assertTrue(d3info.get("r3").size() == 1); - assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD)); - - DFSTopologyNodeImpl l1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); - HashMap<String, EnumMap<StorageType, Integer>> l1info = - l1.getChildrenStorageInfo(); - assertEquals(3, l1info.keySet().size()); - assertTrue(l1info.get("d1").size() == 3 && - l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1); - assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK)); - assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE)); - assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); - assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD)); - - - for (int i = 0; i<4; i++) { - CLUSTER.remove(newDD[i]); - } - - // /d1/r3 should've been out, /d1/r1 should've been resumed - DFSTopologyNodeImpl nd1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); - HashMap<String, EnumMap<StorageType, Integer>> nd1info = - nd1.getChildrenStorageInfo(); - assertEquals(2, nd1info.keySet().size()); - assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2); - assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK)); - assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE)); - assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK)); - assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE)); - - // /l1/d3 should've been out, and /l1/d1 should've been resumed - DFSTopologyNodeImpl nl1 = - (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); - HashMap<String, EnumMap<StorageType, Integer>> nl1info = - nl1.getChildrenStorageInfo(); - assertEquals(2, nl1info.keySet().size()); - assertTrue(l1info.get("d1").size() == 2 - && l1info.get("d2").size() == 3); - assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE)); - assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); - assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); - - assertNull(CLUSTER.getNode("/l1/d3")); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java new file mode 100644 index 0000000..32ecf886 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.net; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.net.Node; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the correctness of storage type info stored in + * DFSNetworkTopology. + */ +public class TestDFSNetworkTopology { + private static final Log LOG = + LogFactory.getLog(TestDFSNetworkTopology.class); + private final static DFSNetworkTopology CLUSTER = + DFSNetworkTopology.getInstance(new Configuration()); + private DatanodeDescriptor[] dataNodes; + + @Rule + public Timeout testTimeout = new Timeout(30000); + + @Before + public void setupDatanodes() { + final String[] racks = { + "/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2", + + "/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3", + + "/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5", + + "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", + "/l2/d4/r1", "/l2/d4/r1"}; + final String[] hosts = { + "host1", "host2", "host3", "host4", "host5", + "host6", "host7", "host8", + "host9", "host10", "host11", "host12", "host13", + "host14", "host15", "host16", "host17", "host18", "host19", "host20"}; + final StorageType[] types = { + StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE, + StorageType.DISK, StorageType.DISK, + + StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD, + + StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK, + StorageType.ARCHIVE, StorageType.ARCHIVE, + + StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK, + StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE, + StorageType.SSD}; + final DatanodeStorageInfo[] storages = + DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types); + dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); + for (int i = 0; i < dataNodes.length; i++) { + CLUSTER.add(dataNodes[i]); + } + dataNodes[9].setDecommissioned(); + dataNodes[10].setDecommissioned(); + } + + /** + * Test getting the storage type info of subtree. + * @throws Exception + */ + @Test + public void testGetStorageTypeInfo() throws Exception { + // checking level = 2 nodes + DFSTopologyNodeImpl d1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); + HashMap<String, EnumMap<StorageType, Integer>> d1info = + d1.getChildrenStorageInfo(); + assertEquals(2, d1info.keySet().size()); + assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2); + assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK)); + assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE)); + assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK)); + assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE)); + + DFSTopologyNodeImpl d2 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2"); + HashMap<String, EnumMap<StorageType, Integer>> d2info = + d2.getChildrenStorageInfo(); + assertEquals(1, d2info.keySet().size()); + assertTrue(d2info.get("r3").size() == 3); + assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK)); + assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK)); + assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD)); + + DFSTopologyNodeImpl d3 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3"); + HashMap<String, EnumMap<StorageType, Integer>> d3info = + d3.getChildrenStorageInfo(); + assertEquals(5, d3info.keySet().size()); + assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK)); + assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK)); + assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK)); + assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE)); + assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE)); + + DFSTopologyNodeImpl d4 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4"); + HashMap<String, EnumMap<StorageType, Integer>> d4info = + d4.getChildrenStorageInfo(); + assertEquals(1, d4info.keySet().size()); + assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK)); + assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK)); + assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE)); + assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD)); + + DFSTopologyNodeImpl l1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); + HashMap<String, EnumMap<StorageType, Integer>> l1info = + l1.getChildrenStorageInfo(); + assertEquals(2, l1info.keySet().size()); + assertTrue(l1info.get("d1").size() == 2 + && l1info.get("d2").size() == 3); + assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE)); + assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); + + // checking level = 1 nodes + DFSTopologyNodeImpl l2 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l2"); + HashMap<String, EnumMap<StorageType, Integer>> l2info = + l2.getChildrenStorageInfo(); + assertTrue(l2info.get("d3").size() == 3 + && l2info.get("d4").size() == 4); + assertEquals(2, l2info.keySet().size()); + assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK)); + assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE)); + assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK)); + assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK)); + assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE)); + assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK)); + assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD)); + } + + /** + * Test the correctness of storage type info when nodes are added and removed. + * @throws Exception + */ + @Test + public void testAddAndRemoveTopology() throws Exception { + String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"}; + String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"}; + String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32", + "33.33.33.33"}; + StorageType[] newTypes = {StorageType.DISK, StorageType.SSD, + StorageType.SSD, StorageType.SSD}; + DatanodeDescriptor[] newDD = new DatanodeDescriptor[4]; + + for (int i = 0; i<4; i++) { + DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo( + "s" + newHost[i], newips[i], newRack[i], newHost[i], + newTypes[i], null); + newDD[i] = dsi.getDatanodeDescriptor(); + CLUSTER.add(newDD[i]); + } + + DFSTopologyNodeImpl d1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); + HashMap<String, EnumMap<StorageType, Integer>> d1info = + d1.getChildrenStorageInfo(); + assertEquals(3, d1info.keySet().size()); + assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2 + && d1info.get("r3").size() == 1); + assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK)); + assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE)); + assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK)); + assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE)); + assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD)); + + DFSTopologyNodeImpl d3 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3"); + HashMap<String, EnumMap<StorageType, Integer>> d3info = + d3.getChildrenStorageInfo(); + assertEquals(1, d3info.keySet().size()); + assertTrue(d3info.get("r3").size() == 1); + assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD)); + + DFSTopologyNodeImpl l1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); + HashMap<String, EnumMap<StorageType, Integer>> l1info = + l1.getChildrenStorageInfo(); + assertEquals(3, l1info.keySet().size()); + assertTrue(l1info.get("d1").size() == 3 && + l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1); + assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK)); + assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE)); + assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); + assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD)); + + + for (int i = 0; i<4; i++) { + CLUSTER.remove(newDD[i]); + } + + // /d1/r3 should've been out, /d1/r1 should've been resumed + DFSTopologyNodeImpl nd1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1"); + HashMap<String, EnumMap<StorageType, Integer>> nd1info = + nd1.getChildrenStorageInfo(); + assertEquals(2, nd1info.keySet().size()); + assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2); + assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK)); + assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE)); + assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK)); + assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE)); + + // /l1/d3 should've been out, and /l1/d1 should've been resumed + DFSTopologyNodeImpl nl1 = + (DFSTopologyNodeImpl) CLUSTER.getNode("/l1"); + HashMap<String, EnumMap<StorageType, Integer>> nl1info = + nl1.getChildrenStorageInfo(); + assertEquals(2, nl1info.keySet().size()); + assertTrue(l1info.get("d1").size() == 2 + && l1info.get("d2").size() == 3); + assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE)); + assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK)); + assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD)); + + assertNull(CLUSTER.getNode("/l1/d3")); + } + + @Test + public void testChooseRandomWithStorageType() throws Exception { + Node n; + DatanodeDescriptor dd; + // test the choose random can return desired storage type nodes without + // exclude + Set<String> diskUnderL1 = + Sets.newHashSet("host2", "host4", "host5", "host6"); + Set<String> archiveUnderL1 = Sets.newHashSet("host1", "host3"); + Set<String> ramdiskUnderL1 = Sets.newHashSet("host7"); + Set<String> ssdUnderL1 = Sets.newHashSet("host8"); + for (int i = 0; i < 10; i++) { + n = CLUSTER.chooseRandomWithStorageType("/l1", null, null, + StorageType.DISK); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(diskUnderL1.contains(dd.getHostName())); + + n = CLUSTER.chooseRandomWithStorageType("/l1", null, null, + StorageType.RAM_DISK); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(ramdiskUnderL1.contains(dd.getHostName())); + + n = CLUSTER.chooseRandomWithStorageType("/l1", null, null, + StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(archiveUnderL1.contains(dd.getHostName())); + + n = CLUSTER.chooseRandomWithStorageType("/l1", null, null, + StorageType.SSD); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(ssdUnderL1.contains(dd.getHostName())); + } + } + + @Test + public void testChooseRandomWithStorageTypeWithExcluded() throws Exception { + Node n; + DatanodeDescriptor dd; + // below test choose random with exclude, for /l2/d3, every rack has exactly + // one host + // /l2/d3 has five racks r[1~5] but only r4 and r5 have ARCHIVE + // host12 is the one under "/l2/d3/r4", host13 is the one under "/l2/d3/r5" + n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r4", null, null, + StorageType.ARCHIVE); + HashSet<Node> excluded = new HashSet<>(); + // exclude the host on r4 (since there is only one host, no randomness here) + excluded.add(n); + + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType("/l2/d3", null, null, + StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host12") || + dd.getHostName().equals("host13")); + } + + // test exclude nodes + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType("/l2/d3", null, excluded, + StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host13")); + } + + // test exclude scope + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType("/l2/d3", "/l2/d3/r4", null, + StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host13")); + } + + // test exclude scope + excluded node with expected null return node + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType("/l2/d3", "/l2/d3/r5", excluded, + StorageType.ARCHIVE); + assertNull(n); + } + + // test exclude scope + excluded node with expected non-null return node + n = CLUSTER.chooseRandomWithStorageType("/l1/d2", null, null, + StorageType.DISK); + dd = (DatanodeDescriptor)n; + assertEquals("host6", dd.getHostName()); + // exclude the host on r4 (since there is only one host, no randomness here) + excluded.add(n); + Set<String> expectedSet = Sets.newHashSet("host4", "host5"); + for (int i = 0; i<10; i++) { + // under l1, there are four hosts with DISK: + // /l1/d1/r1/host2, /l1/d1/r2/host4, /l1/d1/r2/host5 and /l1/d2/r3/host6 + // host6 is excludedNode, host2 is under excluded range scope /l1/d1/r1 + // so should always return r4 or r5 + n = CLUSTER.chooseRandomWithStorageType( + "/l1", "/l1/d1/r1", excluded, StorageType.DISK); + dd = (DatanodeDescriptor) n; + assertTrue(expectedSet.contains(dd.getHostName())); + } + } + + + /** + * This test tests the wrapper method. The wrapper method only takes one scope + * where if it starts with a ~, it is an excluded scope, and searching always + * from root. Otherwise it is a scope. + * @throws Exception throws exception. + */ + @Test + public void testChooseRandomWithStorageTypeWrapper() throws Exception { + Node n; + DatanodeDescriptor dd; + n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r4", null, null, + StorageType.ARCHIVE); + HashSet<Node> excluded = new HashSet<>(); + // exclude the host on r4 (since there is only one host, no randomness here) + excluded.add(n); + + // search with given scope being desired scope + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType( + "/l2/d3", null, StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host12") || + dd.getHostName().equals("host13")); + } + + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType( + "/l2/d3", excluded, StorageType.ARCHIVE); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host13")); + } + + // search with given scope being exclude scope + + // a total of 4 ramdisk nodes: + // /l1/d2/r3/host7, /l2/d3/r2/host10, /l2/d4/r1/host7 and /l2/d4/r1/host10 + // so if we exclude /l2/d4/r1, if should be always either host7 or host10 + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType( + "~/l2/d4", null, StorageType.RAM_DISK); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host7") || + dd.getHostName().equals("host10")); + } + + // similar to above, except that we also exclude host10 here. so it should + // always be host7 + n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r2", null, null, + StorageType.RAM_DISK); + // add host10 to exclude + excluded.add(n); + for (int i = 0; i<10; i++) { + n = CLUSTER.chooseRandomWithStorageType( + "~/l2/d4", excluded, StorageType.RAM_DISK); + assertTrue(n instanceof DatanodeDescriptor); + dd = (DatanodeDescriptor) n; + assertTrue(dd.getHostName().equals("host7")); + } + } + + @Test + public void testNonExistingNode() throws Exception { + Node n; + n = CLUSTER.chooseRandomWithStorageType( + "/l100", null, null, StorageType.DISK); + assertNull(n); + n = CLUSTER.chooseRandomWithStorageType( + "/l100/d100", null, null, StorageType.DISK); + assertNull(n); + n = CLUSTER.chooseRandomWithStorageType( + "/l100/d100/r100", null, null, StorageType.DISK); + assertNull(n); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org