HDFS-9420. Add DataModels for DiskBalancer. Contributed by Anu Engineer
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91a5c481 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91a5c481 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91a5c481 Branch: refs/heads/trunk Commit: 91a5c4814381a4d4c3ce9b29a1f85299e03be835 Parents: 0b9edf6 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Mon Nov 23 19:07:42 2015 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Jun 23 18:18:48 2016 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/HDFS-1312_CHANGES.txt | 5 + .../connectors/ClusterConnector.java | 44 +++ .../diskbalancer/connectors/package-info.java | 29 ++ .../datamodel/DiskBalancerCluster.java | 249 ++++++++++++++ .../datamodel/DiskBalancerDataNode.java | 269 +++++++++++++++ .../datamodel/DiskBalancerVolume.java | 330 +++++++++++++++++++ .../datamodel/DiskBalancerVolumeSet.java | 325 ++++++++++++++++++ .../diskbalancer/datamodel/package-info.java | 31 ++ .../hdfs/server/diskbalancer/package-info.java | 36 ++ .../diskbalancer/DiskBalancerTestUtil.java | 227 +++++++++++++ .../server/diskbalancer/TestDataModels.java | 224 +++++++++++++ .../diskbalancer/connectors/NullConnector.java | 59 ++++ 12 files changed, 1828 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt new file mode 100644 index 0000000..5a71032 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt @@ -0,0 +1,5 @@ +HDFS-1312 Change Log + + NEW FEATURES + + HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo) http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java new file mode 100644 index 0000000..3dbfec2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; + +import java.util.List; + +/** + * ClusterConnector interface hides all specifics about how we communicate to + * the HDFS cluster. This interface returns data in classes that diskbalancer + * understands. + */ +public interface ClusterConnector { + + /** + * getNodes function returns a list of DiskBalancerDataNodes. + * + * @return Array of DiskBalancerDataNodes + */ + List<DiskBalancerDataNode> getNodes() throws Exception; + + /** + * Returns info about the connector. + * + * @return String. + */ + String getConnectorInfo(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java new file mode 100644 index 0000000..8164804 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +/** + * Connectors package is a set of logical connectors that connect + * to various data sources to read the hadoop cluster information. + * + * We currently have 1 connector in this package. it is + * + * NullConnector - This is an in-memory connector that is useful in testing. + * we can crate dataNodes on the fly and attach to this connector and + * ask the diskBalancer Cluster to read data from this source. + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java new file mode 100644 index 0000000..91f7eaa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.datamodel; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * DiskBalancerCluster represents the nodes that we are working against. + * <p/> + * Please Note : + * <p/> + * Semantics of inclusionList and exclusionLists. + * <p/> + * If a non-empty inclusionList is specified then the diskBalancer assumes that + * the user is only interested in processing that list of nodes. This node list + * is checked against the exclusionList and only the nodes in inclusionList but + * not in exclusionList is processed. + * <p/> + * if inclusionList is empty, then we assume that all live nodes in the nodes is + * to be processed by diskBalancer. In that case diskBalancer will avoid any + * nodes specified in the exclusionList but will process all nodes in the + * cluster. + * <p/> + * In other words, an empty inclusionList is means all the nodes otherwise + * only a given list is processed and ExclusionList is always honored. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class DiskBalancerCluster { + + static final Log LOG = LogFactory.getLog(DiskBalancerCluster.class); + private final Set<String> exclusionList; + private final Set<String> inclusionList; + private ClusterConnector clusterConnector; + private List<DiskBalancerDataNode> nodes; + private String outputpath; + + @JsonIgnore + private List<DiskBalancerDataNode> nodesToProcess; + private float threshold; + + /** + * Empty Constructor needed by Jackson. + */ + public DiskBalancerCluster() { + nodes = new LinkedList<>(); + exclusionList = new TreeSet<>(); + inclusionList = new TreeSet<>(); + + } + + /** + * Constructs a DiskBalancerCluster. + * + * @param connector - ClusterConnector + * @throws IOException + */ + public DiskBalancerCluster(ClusterConnector connector) throws IOException { + Preconditions.checkNotNull(connector); + clusterConnector = connector; + exclusionList = new TreeSet<>(); + inclusionList = new TreeSet<>(); + } + + /** + * Parses a Json string and converts to DiskBalancerCluster. + * + * @param json - Json String + * @return DiskBalancerCluster + * @throws IOException + */ + public static DiskBalancerCluster parseJson(String json) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, DiskBalancerCluster.class); + } + + /** + * readClusterInfo connects to the cluster and reads the node's data. This + * data is used as basis of rest of computation in DiskBalancerCluster + */ + public void readClusterInfo() throws Exception { + Preconditions.checkNotNull(clusterConnector); + LOG.info("Using connector : " + clusterConnector.getConnectorInfo()); + nodes = clusterConnector.getNodes(); + } + + /** + * Gets all DataNodes in the Cluster. + * + * @return Array of DisKBalancerDataNodes + */ + public List<DiskBalancerDataNode> getNodes() { + return nodes; + } + + /** + * Sets the list of nodes of this cluster. + * + * @param clusterNodes List of Nodes + */ + public void setNodes(List<DiskBalancerDataNode> clusterNodes) { + this.nodes = clusterNodes; + } + + /** + * Returns the current ExclusionList. + * + * @return List of Nodes that are excluded from diskBalancer right now. + */ + public Set<String> getExclusionList() { + return exclusionList; + } + + /** + * sets the list of nodes to exclude from process of diskBalancer. + * + * @param excludedNodes - exclusionList of nodes. + */ + public void setExclusionList(Set<String> excludedNodes) { + this.exclusionList.addAll(excludedNodes); + } + + /** + * Returns the threshold value. This is used for indicating how much skew is + * acceptable, This is expressed as a percentage. For example to say 20% skew + * between volumes is acceptable set this value to 20. + * + * @return float + */ + public float getThreshold() { + return threshold; + } + + /** + * Sets the threshold value. + * + * @param thresholdPercent - float - in percentage + */ + public void setThreshold(float thresholdPercent) { + Preconditions.checkState((thresholdPercent >= 0.0f) && + (thresholdPercent <= 100.0f), "A percentage value expected."); + this.threshold = thresholdPercent; + } + + /** + * Gets the Inclusion list. + * + * @return List of machine to be processed by diskBalancer. + */ + public Set<String> getInclusionList() { + return inclusionList; + } + + /** + * Sets the inclusionList. + * + * @param includeNodes - set of machines to be processed by diskBalancer. + */ + public void setInclusionList(Set<String> includeNodes) { + this.inclusionList.addAll(includeNodes); + } + + /** + * returns a serialized json string. + * + * @return String - json + * @throws IOException + */ + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(this); + } + + /** + * Returns the Nodes to Process which is the real list of nodes processed by + * diskBalancer. + * + * @return List of DiskBalancerDataNodes + */ + @JsonIgnore + public List<DiskBalancerDataNode> getNodesToProcess() { + return nodesToProcess; + } + + /** + * Sets the nodes to process. + * + * @param dnNodesToProcess - List of DataNodes to process + */ + @JsonIgnore + public void setNodesToProcess(List<DiskBalancerDataNode> dnNodesToProcess) { + this.nodesToProcess = dnNodesToProcess; + } + + /** + * Returns th output path for this cluster. + */ + public String getOutput() { + return outputpath; + } + + /** + * Sets the output path for this run. + * + * @param output - Path + */ + public void setOutput(String output) { + this.outputpath = output; + } + + /** + * Writes a snapshot of the cluster to the specified directory. + * + * @param snapShotName - name of the snapshot + */ + public void createSnapshot(String snapShotName) throws IOException { + String json = this.toJson(); + File outFile = new File(getOutput() + "/" + snapShotName); + FileUtils.writeStringToFile(outFile, json); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java new file mode 100644 index 0000000..87030db --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.datamodel; + +import com.google.common.base.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * DiskBalancerDataNode represents a DataNode that exists in the cluster. It + * also contains a metric called nodeDataDensity which allows us to compare + * between a set of Nodes. + */ +public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> { + private float nodeDataDensity; + private Map<String, DiskBalancerVolumeSet> volumeSets; + private String dataNodeUUID; + private String dataNodeIP; + private int dataNodePort; + private String dataNodeName; + private int volumeCount; + + /** + * Constructs an Empty Data Node. + */ + public DiskBalancerDataNode() { + } + + /** + * Constructs a DataNode. + * + * @param dataNodeID - Node ID + */ + public DiskBalancerDataNode(String dataNodeID) { + this.dataNodeUUID = dataNodeID; + volumeSets = new HashMap<>(); + } + + /** + * Returns the IP address of this Node. + * + * @return IP Address string + */ + public String getDataNodeIP() { + return dataNodeIP; + } + + /** + * Sets the IP address of this Node. + * + * @param ipaddress - IP Address + */ + public void setDataNodeIP(String ipaddress) { + this.dataNodeIP = ipaddress; + } + + /** + * Returns the Port of this DataNode. + * + * @return Port Number + */ + public int getDataNodePort() { + return dataNodePort; + } + + /** + * Sets the DataNode Port number. + * + * @param port - Datanode Port Number + */ + public void setDataNodePort(int port) { + this.dataNodePort = port; + } + + /** + * Get DataNode DNS name. + * + * @return name of the node + */ + public String getDataNodeName() { + return dataNodeName; + } + + /** + * Sets node's DNS name. + * + * @param name - Data node name + */ + public void setDataNodeName(String name) { + this.dataNodeName = name; + } + + /** + * Returns the Volume sets on this node. + * + * @return a Map of VolumeSets + */ + public Map<String, DiskBalancerVolumeSet> getVolumeSets() { + return volumeSets; + } + + /** + * Returns datanode ID. + **/ + public String getDataNodeUUID() { + return dataNodeUUID; + } + + /** + * Sets Datanode UUID. + * + * @param nodeID - Node ID. + */ + public void setDataNodeUUID(String nodeID) { + this.dataNodeUUID = nodeID; + } + + /** + * Indicates whether some other object is "equal to" this one. + */ + @Override + public boolean equals(Object obj) { + if ((obj == null) || (obj.getClass() != getClass())) { + return false; + } + DiskBalancerDataNode that = (DiskBalancerDataNode) obj; + return dataNodeUUID.equals(that.getDataNodeUUID()); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less than, + * equal to, or greater than the specified object. + * + * @param that the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(DiskBalancerDataNode that) { + Preconditions.checkNotNull(that); + + if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0) + < 0) { + return -1; + } + + if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0) + == 0) { + return 0; + } + + if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0) + > 0) { + return 1; + } + return 0; + } + + /** + * Returns a hash code value for the object. This method is supported for the + * benefit of hash tables such as those provided by {@link HashMap}. + */ + @Override + public int hashCode() { + return super.hashCode(); + } + + /** + * returns NodeDataDensity Metric. + * + * @return float + */ + public float getNodeDataDensity() { + return nodeDataDensity; + } + + /** + * computes nodes data density. + * <p/> + * This metric allows us to compare different nodes and how well the data is + * spread across a set of volumes inside the node. + */ + public void computeNodeDensity() { + float sum = 0; + int volcount = 0; + for (DiskBalancerVolumeSet vset : volumeSets.values()) { + for (DiskBalancerVolume vol : vset.getVolumes()) { + sum += Math.abs(vol.getVolumeDataDensity()); + volcount++; + } + } + nodeDataDensity = sum; + this.volumeCount = volcount; + + } + + /** + * Computes if this node needs balancing at all. + * + * @param threshold - Percentage + * @return true or false + */ + public boolean isBalancingNeeded(float threshold) { + for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) { + if (vSet.isBalancingNeeded(threshold)) { + return true; + } + } + return false; + } + + /** + * Adds a volume to the DataNode. + * <p/> + * it is assumed that we have one thread per node hence this call is not + * synchronised neither is the map is protected. + * + * @param volume - volume + */ + public void addVolume(DiskBalancerVolume volume) throws Exception { + Preconditions.checkNotNull(volume, "volume cannot be null"); + Preconditions.checkNotNull(volumeSets, "volume sets cannot be null"); + Preconditions + .checkNotNull(volume.getStorageType(), "storage type cannot be null"); + + String volumeSetKey = volume.getStorageType(); + DiskBalancerVolumeSet vSet; + if (volumeSets.containsKey(volumeSetKey)) { + vSet = volumeSets.get(volumeSetKey); + } else { + vSet = new DiskBalancerVolumeSet(volume.isTransient()); + volumeSets.put(volumeSetKey, vSet); + } + + vSet.addVolume(volume); + computeNodeDensity(); + } + + /** + * Returns how many volumes are in the DataNode. + * + * @return int + */ + public int getVolumeCount() { + return volumeCount; + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java new file mode 100644 index 0000000..a608248 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.datamodel; + +import com.google.common.base.Preconditions; +import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** + * DiskBalancerVolume represents a volume in the DataNode. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class DiskBalancerVolume { + private String path; + private long capacity; + private String storageType; + private long used; + private long reserved; + private String uuid; + private boolean failed; + private boolean isTransient; + private float volumeDataDensity; + private boolean skip = false; + private boolean isReadOnly; + + /** + * Constructs DiskBalancerVolume. + */ + public DiskBalancerVolume() { + } + + /** + * Parses a Json string and converts to DiskBalancerVolume. + * + * @param json - Json String + * + * @return DiskBalancerCluster + * + * @throws IOException + */ + public static DiskBalancerVolume parseJson(String json) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(json, DiskBalancerVolume.class); + } + + /** + * Get this volume Data Density + * Please see DiskBalancerVolumeSet#computeVolumeDataDensity to see how + * this is computed. + * + * @return float. + */ + public float getVolumeDataDensity() { + return volumeDataDensity; + } + + /** + * Sets this volume's data density. + * + * @param volDataDensity - density + */ + public void setVolumeDataDensity(float volDataDensity) { + this.volumeDataDensity = volDataDensity; + } + + /** + * Indicates if the volume is Transient in nature. + * + * @return true or false. + */ + public boolean isTransient() { + return isTransient; + } + + /** + * Sets volumes transient nature. + * + * @param aTransient - bool + */ + public void setTransient(boolean aTransient) { + this.isTransient = aTransient; + } + + /** + * Compares two volumes and decides if it is the same volume. + * + * @param o Volume Object + * + * @return boolean + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DiskBalancerVolume that = (DiskBalancerVolume) o; + return uuid.equals(that.uuid); + } + + /** + * Computes hash code for a diskBalancerVolume. + * + * @return int + */ + @Override + public int hashCode() { + return uuid.hashCode(); + } + + /** + * Capacity of this volume. + * + * @return long + */ + public long getCapacity() { + return capacity; + } + + /** + * Sets the capacity of this volume. + * + * @param totalCapacity long + */ + public void setCapacity(long totalCapacity) { + this.capacity = totalCapacity; + } + + /** + * Indicates if this is a failed volume. + * + * @return boolean + */ + public boolean isFailed() { + return failed; + } + + /** + * Sets the failed flag for this volume. + * + * @param fail boolean + */ + public void setFailed(boolean fail) { + this.failed = fail; + } + + /** + * Returns the path for this volume. + * + * @return String + */ + public String getPath() { + return path; + } + + /** + * Sets the path for this volume. + * + * @param volPath Path + */ + public void setPath(String volPath) { + this.path = volPath; + } + + /** + * Gets the reserved size for this volume. + * + * @return Long - Reserved size. + */ + public long getReserved() { + return reserved; + } + + /** + * Sets the reserved size. + * + * @param reservedSize -- Sets the reserved. + */ + public void setReserved(long reservedSize) { + this.reserved = reservedSize; + } + + /** + * Gets the StorageType. + * + * @return String StorageType. + */ + public String getStorageType() { + return storageType; + } + + /** + * Sets the StorageType. + * + * @param typeOfStorage - Storage Type String. + */ + public void setStorageType(String typeOfStorage) { + this.storageType = typeOfStorage; + } + + /** + * Gets the dfsUsed Size. + * + * @return - long - used space + */ + public long getUsed() { + return used; + } + + /** + * Sets the used Space for Long. + * + * @param dfsUsedSpace - dfsUsedSpace for this volume. + */ + public void setUsed(long dfsUsedSpace) { + Preconditions.checkArgument(dfsUsedSpace < this.getCapacity()); + this.used = dfsUsedSpace; + } + + /** + * Gets the uuid for this volume. + * + * @return String - uuid of th volume + */ + public String getUuid() { + return uuid; + } + + /** + * Sets the uuid for this volume. + * + * @param id - String + */ + public void setUuid(String id) { + this.uuid = id; + } + + /** + * Returns effective capacity of a volume. + * + * @return float - fraction that represents used capacity. + */ + @JsonIgnore + public long computeEffectiveCapacity() { + return getCapacity() - getReserved(); + } + + /** + * returns a Json String. + * + * @return String + * + * @throws IOException + */ + public String toJson() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(this); + } + + /** + * returns if we should skip this volume. + * @return true / false + */ + public boolean isSkip() { + return skip; + } + + /** + * Sets the Skip value for this volume. + * @param skipValue bool + */ + public void setSkip(boolean skipValue) { + this.skip = skipValue; + } + + /** + * Returns the usedPercentage of a disk. + * This is useful in debugging disk usage + * @return float + */ + public float computeUsedPercentage() { + return (float) (getUsed()) / (float) (getCapacity()); + } + + /** + * Tells us if a volume is transient. + * @param transientValue + */ + public void setIsTransient(boolean transientValue) { + this.isTransient = transientValue; + } + + /** + * Tells us if this volume is read-only. + * @return true / false + */ + public boolean isReadOnly() { + return isReadOnly; + } + + /** + * Sets this volume as read only. + * @param readOnly - boolean + */ + public void setReadOnly(boolean readOnly) { + isReadOnly = readOnly; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java new file mode 100644 index 0000000..15c21ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.datamodel; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.htrace.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +/** + * DiskBalancerVolumeSet is a collection of storage devices on the + * data node which are of similar StorageType. + */ +@JsonIgnoreProperties({"sortedQueue", "volumeCount", "idealUsed"}) +public class DiskBalancerVolumeSet { + static final Log LOG = LogFactory.getLog(DiskBalancerVolumeSet.class); + private final int maxDisks = 256; + + @JsonProperty("transient") + private boolean isTransient; + private Set<DiskBalancerVolume> volumes; + + @JsonIgnore + private TreeSet<DiskBalancerVolume> sortedQueue; + private String storageType; + private String setID; + + private float idealUsed; + + + /** + * Constructs Empty DiskNBalanceVolumeSet. + * This is needed by jackson + */ + public DiskBalancerVolumeSet() { + setID = UUID.randomUUID().toString(); + } + + /** + * Constructs a DiskBalancerVolumeSet. + * + * @param isTransient - boolean + */ + public DiskBalancerVolumeSet(boolean isTransient) { + this.isTransient = isTransient; + volumes = new HashSet<>(maxDisks); + sortedQueue = new TreeSet<>(new MinHeap()); + this.storageType = null; + setID = UUID.randomUUID().toString(); + } + + /** + * Constructs a new DiskBalancerVolumeSet. + */ + public DiskBalancerVolumeSet(DiskBalancerVolumeSet volumeSet) { + this.isTransient = volumeSet.isTransient(); + this.storageType = volumeSet.storageType; + this.volumes = new HashSet<>(volumeSet.volumes); + sortedQueue = new TreeSet<>(new MinHeap()); + setID = UUID.randomUUID().toString(); + } + + /** + * Tells us if this volumeSet is transient. + * + * @return - true or false + */ + @JsonProperty("transient") + public boolean isTransient() { + return isTransient; + } + + /** + * Set the transient properties for this volumeSet. + * + * @param transientValue - Boolean + */ + @JsonProperty("transient") + public void setTransient(boolean transientValue) { + this.isTransient = transientValue; + } + + /** + * Computes Volume Data Density. Adding a new volume changes + * the volumeDataDensity for all volumes. So we throw away + * our priority queue and recompute everything. + * + * we discard failed volumes from this computation. + * + * totalCapacity = totalCapacity of this volumeSet + * totalUsed = totalDfsUsed for this volumeSet + * idealUsed = totalUsed / totalCapacity + * dfsUsedRatio = dfsUsedOnAVolume / Capacity On that Volume + * volumeDataDensity = idealUsed - dfsUsedRatio + */ + public void computeVolumeDataDensity() { + long totalCapacity = 0; + long totalUsed = 0; + sortedQueue.clear(); + + // when we plan to re-distribute data we need to make + // sure that we skip failed volumes. + for (DiskBalancerVolume volume : volumes) { + if (!volume.isFailed() && !volume.isSkip()) { + + if (volume.computeEffectiveCapacity() < 0) { + skipMisConfiguredVolume(volume); + continue; + } + + totalCapacity += volume.computeEffectiveCapacity(); + totalUsed += volume.getUsed(); + } + } + + if (totalCapacity != 0) { + this.idealUsed = totalUsed / (float) totalCapacity; + } + + for (DiskBalancerVolume volume : volumes) { + if (!volume.isFailed() && !volume.isSkip()) { + float dfsUsedRatio = + volume.getUsed() / (float) volume.computeEffectiveCapacity(); + volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio); + sortedQueue.add(volume); + } + } + } + + private void skipMisConfiguredVolume(DiskBalancerVolume volume) { + //probably points to some sort of mis-configuration. Log this and skip + // processing this volume. + String errMessage = String.format("Real capacity is negative." + + "This usually points to some " + + "kind of mis-configuration.%n" + + "Capacity : %d Reserved : %d " + + "realCap = capacity - " + + "reserved = %d.%n" + + "Skipping this volume from " + + "all processing. type : %s id" + + " :%s", + volume.getCapacity(), + volume.getReserved(), + volume.computeEffectiveCapacity(), + volume.getStorageType(), + volume.getUuid()); + + LOG.fatal(errMessage); + volume.setSkip(true); + } + + /** + * Returns the number of volumes in the Volume Set. + * + * @return int + */ + @JsonIgnore + public int getVolumeCount() { + return volumes.size(); + } + + /** + * Get Storage Type. + * + * @return String + */ + public String getStorageType() { + return storageType; + } + + /** + * Set Storage Type. + * @param typeOfStorage -- StorageType + */ + public void setStorageType(String typeOfStorage) { + this.storageType = typeOfStorage; + } + + /** + * adds a given volume into this volume set. + * + * @param volume - volume to add. + * + * @throws Exception + */ + public void addVolume(DiskBalancerVolume volume) throws Exception { + Preconditions.checkNotNull(volume, "volume cannot be null"); + Preconditions.checkState(isTransient() == volume.isTransient(), + "Mismatch in volumeSet and volume's transient " + + "properties."); + + + if (this.storageType == null) { + Preconditions.checkState(volumes.size() == 0L, "Storage Type is Null but" + + " volume size is " + volumes.size()); + this.storageType = volume.getStorageType(); + } else { + Preconditions.checkState(this.storageType.equals(volume.getStorageType()), + "Adding wrong type of disk to this volume set"); + } + volumes.add(volume); + computeVolumeDataDensity(); + + } + + /** + * Returns a list diskVolumes that are part of this volume set. + * + * @return List + */ + public List<DiskBalancerVolume> getVolumes() { + return new ArrayList<>(volumes); + } + + + @JsonIgnore + public TreeSet<DiskBalancerVolume> getSortedQueue() { + return sortedQueue; + } + + /** + * Computes whether we need to do any balancing on this volume Set at all. + * It checks if any disks are out of threshold value + * + * @param thresholdPercentage - threshold - in percentage + * + * @return true if balancing is needed false otherwise. + */ + public boolean isBalancingNeeded(float thresholdPercentage) { + float threshold = thresholdPercentage / 100.0f; + + if(volumes == null || volumes.size() <= 1) { + // there is nothing we can do with a single volume. + // so no planning needed. + return false; + } + + for (DiskBalancerVolume vol : volumes) { + boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip(); + if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) { + return true; + } + } + return false; + } + + /** + * Remove a volume from the current set. + * + * This call does not recompute the volumeDataDensity. It has to be + * done manually after this call. + * + * @param volume - Volume to remove + */ + public void removeVolume(DiskBalancerVolume volume) { + volumes.remove(volume); + sortedQueue.remove(volume); + } + + /** + * Get Volume Set ID. + * @return String + */ + public String getSetID() { + return setID; + } + + /** + * Set VolumeSet ID. + * @param volID String + */ + public void setSetID(String volID) { + this.setID = volID; + } + + /** + * Gets the idealUsed for this volume set. + */ + + @JsonIgnore + public float getIdealUsed() { + return this.idealUsed; + } + + static class MinHeap implements Comparator<DiskBalancerVolume>, Serializable { + + /** + * Compares its two arguments for order. Returns a negative integer, + * zero, or a positive integer as the first argument is less than, equal + * to, or greater than the second. + */ + @Override + public int compare(DiskBalancerVolume first, DiskBalancerVolume second) { + return Float + .compare(second.getVolumeDataDensity(), first.getVolumeDataDensity()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java new file mode 100644 index 0000000..f72e283 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer.datamodel; +/** + * Disk Balancer Data Model is the Data Model for the cluster that + * Disk Balancer is working against. This information is read + * directly from NameNode or from a user supplied json model file. + * + * Here is the overview of the model maintained by diskBalancer. + * + * DiskBalancerCluster is a list of DiskBalancerDataNodes. + * DiskBalancerDataNodes is a collection of DiskBalancerVolumeSets + * DiskBalancerVolumeSets is a collection of DiskBalancerVolumes + * DiskBalancerVolumes represents actual volumes on DataNodes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java new file mode 100644 index 0000000..4bec98f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer; +/** + * Disk Balancer connects to a {@link org.apache.hadoop.hdfs.server.datanode + * .DataNode} and attempts to spread data across all volumes evenly. + * + * This is achieved by : + * + * 1) Calculating the average data that should be on a set of volumes grouped + * by the type. For example, how much data should be on each volume of SSDs on a + * machine. + * + * 2) Once we know the average data that is expected to be on a volume we + * move data from volumes with higher than average load to volumes with + * less than average load. + * + * 3) Disk Balancer operates against data nodes which are live and operational. + * + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java new file mode 100644 index 0000000..5e3f4bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdfs.server.diskbalancer; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; +import org.apache.hadoop.util.Time; + +import java.util.Random; +import java.util.UUID; + +/** + * Helper class to create various cluster configrations at run time. + */ +public class DiskBalancerTestUtil { + // we modeling disks here, hence HDD style units + public static final long GB = 1000000000L; + public static final long TB = 1000000000000L; + private static int[] diskSizes = + {1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200, 300, 400, 500, 600, 700, 800, 900}; + Random rand; + private String stringTable = + "ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0987654321"; + + /** + * Constructs a util class. + */ + public DiskBalancerTestUtil() { + this.rand = new Random(Time.monotonicNow()); + } + + /** + * Returns a random string. + * + * @param length - Number of chars in the string + * + * @return random String + */ + private String getRandomName(int length) { + StringBuilder name = new StringBuilder(); + for (int x = 0; x < length; x++) { + name.append(stringTable.charAt(rand.nextInt(stringTable.length()))); + } + return name.toString(); + } + + /** + * Returns a Random Storage Type. + * + * @return - StorageType + */ + private StorageType getRandomStorageType() { + return StorageType.parseStorageType(rand.nextInt(3)); + } + + /** + * Returns random capacity, if the size is smaller than 10 + * they are TBs otherwise the size is assigned to GB range. + * + * @return Long - Disk Size + */ + private long getRandomCapacity() { + int size = diskSizes[rand.nextInt(diskSizes.length)]; + if (size < 10) { + return size * TB; + } else { + return size * GB; + } + } + + /** + * Some value under 20% in these tests. + */ + private long getRandomReserved(long capacity) { + double rcap = capacity * 0.2d; + double randDouble = rand.nextDouble(); + double temp = randDouble * rcap; + return (new Double(temp)).longValue(); + + } + + /** + * Some value less that capacity - reserved. + */ + private long getRandomDfsUsed(long capacity, long reserved) { + double rcap = capacity - reserved; + double randDouble = rand.nextDouble(); + double temp = randDouble * rcap; + return (new Double(temp)).longValue(); + } + + /** + * Creates a Random Volume of a specific storageType. + * + * @return Volume + */ + public DiskBalancerVolume createRandomVolume() { + return createRandomVolume(getRandomStorageType()); + } + + /** + * Creates a Random Volume for testing purpose. + * + * @param type - StorageType + * + * @return DiskBalancerVolume + */ + public DiskBalancerVolume createRandomVolume(StorageType type) { + DiskBalancerVolume volume = new DiskBalancerVolume(); + volume.setPath("/tmp/disk/" + getRandomName(10)); + volume.setStorageType(type.toString()); + volume.setTransient(type.isTransient()); + + volume.setCapacity(getRandomCapacity()); + volume.setReserved(getRandomReserved(volume.getCapacity())); + volume + .setUsed(getRandomDfsUsed(volume.getCapacity(), volume.getReserved())); + volume.setUuid(UUID.randomUUID().toString()); + return volume; + } + + /** + * Creates a RandomVolumeSet. + * + * @param type -Storage Type + * @param diskCount - How many disks you need. + * + * @return volumeSet + * + * @throws Exception + */ + public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type, + int diskCount) + throws Exception { + + Preconditions.checkState(diskCount > 0); + DiskBalancerVolumeSet volumeSet = + new DiskBalancerVolumeSet(type.isTransient()); + for (int x = 0; x < diskCount; x++) { + volumeSet.addVolume(createRandomVolume(type)); + } + assert (volumeSet.getVolumeCount() == diskCount); + return volumeSet; + } + + /** + * Creates a RandomDataNode. + * + * @param diskTypes - Storage types needed in the Node + * @param diskCount - Disk count - that many disks of each type is created + * + * @return DataNode + * + * @throws Exception + */ + public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes, + int diskCount) + throws Exception { + Preconditions.checkState(diskTypes.length > 0); + Preconditions.checkState(diskCount > 0); + + DiskBalancerDataNode node = + new DiskBalancerDataNode(UUID.randomUUID().toString()); + + for (StorageType t : diskTypes) { + DiskBalancerVolumeSet vSet = createRandomVolumeSet(t, diskCount); + for (DiskBalancerVolume v : vSet.getVolumes()) { + node.addVolume(v); + } + } + return node; + } + + /** + * Creates a RandomCluster. + * + * @param dataNodeCount - How many nodes you need + * @param diskTypes - StorageTypes you need in each node + * @param diskCount - How many disks you need of each type. + * + * @return Cluster + * + * @throws Exception + */ + public DiskBalancerCluster createRandCluster(int dataNodeCount, + StorageType[] diskTypes, + int diskCount) + + throws Exception { + Preconditions.checkState(diskTypes.length > 0); + Preconditions.checkState(diskCount > 0); + Preconditions.checkState(dataNodeCount > 0); + NullConnector nullConnector = new NullConnector(); + DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector); + + // once we add these nodes into the connector, cluster will read them + // from the connector. + for (int x = 0; x < dataNodeCount; x++) { + nullConnector.addNode(createRandomDataNode(diskTypes, diskCount)); + } + + // with this call we have populated the cluster info + cluster.readClusterInfo(); + return cluster; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java new file mode 100644 index 0000000..3507c96 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.diskbalancer; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.TreeSet; +import java.util.UUID; + +public class TestDataModels { + @Test + public void TestCreateRandomVolume() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerVolume vol = util.createRandomVolume(StorageType.DISK); + Assert.assertNotNull(vol.getUuid()); + Assert.assertNotNull(vol.getPath()); + Assert.assertNotNull(vol.getStorageType()); + Assert.assertFalse(vol.isFailed()); + Assert.assertFalse(vol.isTransient()); + Assert.assertTrue(vol.getCapacity() > 0); + Assert.assertTrue((vol.getCapacity() - vol.getReserved()) > 0); + Assert.assertTrue((vol.getReserved() + vol.getUsed()) < vol.getCapacity()); + } + + @Test + public void TestCreateRandomVolumeSet() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerVolumeSet vSet = + util.createRandomVolumeSet(StorageType.SSD, 10); + Assert.assertEquals(10, vSet.getVolumeCount()); + Assert.assertEquals(StorageType.SSD.toString(), + vSet.getVolumes().get(0).getStorageType()); + + } + + @Test + public void TestCreateRandomDataNode() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = util.createRandomDataNode( + new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 10); + Assert.assertNotNull(node.getNodeDataDensity()); + } + + @Test + public void TestDiskQueues() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = util.createRandomDataNode( + new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 3); + TreeSet<DiskBalancerVolume> sortedQueue = + node.getVolumeSets().get(StorageType.DISK.toString()).getSortedQueue(); + + List<DiskBalancerVolume> reverseList = new LinkedList<>(); + List<DiskBalancerVolume> highList = new LinkedList<>(); + int queueSize = sortedQueue.size(); + for (int x = 0; x < queueSize; x++) { + reverseList.add(sortedQueue.first()); + highList.add(sortedQueue.first()); + } + Collections.reverse(reverseList); + + for (int x = 0; x < queueSize; x++) { + + Assert.assertEquals(reverseList.get(x).getCapacity(), + highList.get(x).getCapacity()); + Assert.assertEquals(reverseList.get(x).getReserved(), + highList.get(x).getReserved()); + Assert.assertEquals(reverseList.get(x).getUsed(), + highList.get(x).getUsed()); + } + } + + @Test + public void TestNoBalancingNeededEvenDataSpread() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = + new DiskBalancerDataNode(UUID.randomUUID().toString()); + + // create two disks which have exactly same data and isBalancing should + // say we don't need to balance. + DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD); + v1.setCapacity(DiskBalancerTestUtil.TB); + v1.setReserved(100 * DiskBalancerTestUtil.GB); + v1.setUsed(500 * DiskBalancerTestUtil.GB); + + DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD); + v2.setCapacity(DiskBalancerTestUtil.TB); + v2.setReserved(100 * DiskBalancerTestUtil.GB); + v2.setUsed(500 * DiskBalancerTestUtil.GB); + + node.addVolume(v1); + node.addVolume(v2); + + for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) { + Assert.assertFalse(vsets.isBalancingNeeded(10.0f)); + } + } + + @Test + public void TestNoBalancingNeededTransientDisks() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = + new DiskBalancerDataNode(UUID.randomUUID().toString()); + + // create two disks which have different data sizes, but + // transient. isBalancing should say no balancing needed. + DiskBalancerVolume v1 = util.createRandomVolume(StorageType.RAM_DISK); + v1.setCapacity(DiskBalancerTestUtil.TB); + v1.setReserved(100 * DiskBalancerTestUtil.GB); + v1.setUsed(1 * DiskBalancerTestUtil.GB); + + DiskBalancerVolume v2 = util.createRandomVolume(StorageType.RAM_DISK); + v2.setCapacity(DiskBalancerTestUtil.TB); + v2.setReserved(100 * DiskBalancerTestUtil.GB); + v2.setUsed(500 * DiskBalancerTestUtil.GB); + + node.addVolume(v1); + node.addVolume(v2); + + for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) { + Assert.assertFalse(vsets.isBalancingNeeded(10.0f)); + } + } + + @Test + public void TestNoBalancingNeededFailedDisks() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = + new DiskBalancerDataNode(UUID.randomUUID().toString()); + + // create two disks which have which are normal disks, but fail + // one of them. VolumeSet should say no balancing needed. + DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD); + v1.setCapacity(DiskBalancerTestUtil.TB); + v1.setReserved(100 * DiskBalancerTestUtil.GB); + v1.setUsed(1 * DiskBalancerTestUtil.GB); + v1.setFailed(true); + + DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD); + v2.setCapacity(DiskBalancerTestUtil.TB); + v2.setReserved(100 * DiskBalancerTestUtil.GB); + v2.setUsed(500 * DiskBalancerTestUtil.GB); + + node.addVolume(v1); + node.addVolume(v2); + + for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) { + Assert.assertFalse(vsets.isBalancingNeeded(10.0f)); + } + } + + @Test + public void TestNeedBalancingUnevenDataSpread() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerDataNode node = + new DiskBalancerDataNode(UUID.randomUUID().toString()); + + DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD); + v1.setCapacity(DiskBalancerTestUtil.TB); + v1.setReserved(100 * DiskBalancerTestUtil.GB); + v1.setUsed(0); + + DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD); + v2.setCapacity(DiskBalancerTestUtil.TB); + v2.setReserved(100 * DiskBalancerTestUtil.GB); + v2.setUsed(500 * DiskBalancerTestUtil.GB); + + node.addVolume(v1); + node.addVolume(v2); + + for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) { + Assert.assertTrue(vsets.isBalancingNeeded(10.0f)); + } + } + + @Test + public void TestVolumeSerialize() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + DiskBalancerVolume volume = util.createRandomVolume(StorageType.DISK); + String originalString = volume.toJson(); + DiskBalancerVolume parsedVolume = + DiskBalancerVolume.parseJson(originalString); + String parsedString = parsedVolume.toJson(); + Assert.assertEquals(originalString, parsedString); + } + + @Test + public void TestClusterSerialize() throws Exception { + DiskBalancerTestUtil util = new DiskBalancerTestUtil(); + + // Create a Cluster with 3 datanodes, 3 disk types and 3 disks in each type + // that is 9 disks in each machine. + DiskBalancerCluster cluster = util.createRandCluster(3, new StorageType[]{ + StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD}, 3); + + DiskBalancerCluster newCluster = + DiskBalancerCluster.parseJson(cluster.toJson()); + Assert.assertEquals(cluster.getNodes(), newCluster.getNodes()); + Assert + .assertEquals(cluster.getNodes().size(), newCluster.getNodes().size()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java new file mode 100644 index 0000000..3f78530 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.server.diskbalancer.connectors; + +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; + +import java.util.LinkedList; +import java.util.List; + +/** + * This connector allows user to create an in-memory cluster + * and is useful in testing. + */ +public class NullConnector implements ClusterConnector { + private final List<DiskBalancerDataNode> nodes = new LinkedList<>(); + + /** + * getNodes function returns a list of DiskBalancerDataNodes. + * + * @return Array of DiskBalancerDataNodes + */ + @Override + public List<DiskBalancerDataNode> getNodes() throws Exception { + return nodes; + } + + /** + * Returns info about the connector. + * + * @return String. + */ + @Override + public String getConnectorInfo() { + return "Null Connector : No persistence, in-memory connector"; + } + + /** + * Allows user to add nodes into this connector. + * + * @param node - Node to add + */ + public void addNode(DiskBalancerDataNode node) { + nodes.add(node); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org