http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
new file mode 100644
index 0000000..0a595d5
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * SCM CommonPolicy implements a set of invariants which are common
+ * for all container placement policies, acts as the repository of helper
+ * functions which are common to placement policies.
+ */
+public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMCommonPolicy.class);
+  private final NodeManager nodeManager;
+  private final Random rand;
+  private final Configuration conf;
+
+  /**
+   * Constructs SCM Common Policy Class.
+   *
+   * @param nodeManager NodeManager
+   * @param conf Configuration class.
+   */
+  public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
+    this.nodeManager = nodeManager;
+    this.rand = new Random();
+    this.conf = conf;
+  }
+
+  /**
+   * Return node manager.
+   *
+   * @return node manager
+   */
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
+  /**
+   * Returns the Random Object.
+   *
+   * @return rand
+   */
+  public Random getRand() {
+    return rand;
+  }
+
+  /**
+   * Get Config.
+   *
+   * @return Configuration
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Given the replication factor and size required, return set of datanodes
+   * that satisfy the nodes and size requirement.
+   * <p>
+   * Here are some invariants of container placement.
+   * <p>
+   * 1. We place containers only on healthy nodes.
+   * 2. We place containers on nodes with enough space for that container.
+   * 3. if a set of containers are requested, we either meet the required
+   * number of nodes or we fail that request.
+   *
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return list of datanodes chosen.
+   * @throws SCMException SCM exception.
+   */
+
+  public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long
+      sizeRequired) throws SCMException {
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    String msg;
+    if (healthyNodes.size() == 0) {
+      msg = "No healthy node found to allocate container.";
+      LOG.error(msg);
+      throw new SCMException(msg, SCMException.ResultCodes
+          .FAILED_TO_FIND_HEALTHY_NODES);
+    }
+
+    if (healthyNodes.size() < nodesRequired) {
+      msg = String.format("Not enough healthy nodes to allocate container. %d "
+              + " datanodes required. Found %d",
+          nodesRequired, healthyNodes.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
+        hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList());
+
+    if (healthyList.size() < nodesRequired) {
+      msg = String.format("Unable to find enough nodes that meet the space " +
+              "requirement of %d bytes in healthy node set." +
+              " Nodes required: %d Found: %d",
+          sizeRequired, nodesRequired, healthyList.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE);
+    }
+
+    return healthyList;
+  }
+
+  /**
+   * Returns true if this node has enough space to meet our requirement.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @return true if we have enough space.
+   */
+  private boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
+                                 long sizeRequired) {
+    SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
+    return (nodeMetric != null) && nodeMetric.get().getRemaining()
+        .hasResources(sizeRequired);
+  }
+
+  /**
+   * This function invokes the derived classes chooseNode Function to build a
+   * list of nodes. Then it verifies that invoked policy was able to return
+   * expected number of nodes.
+   *
+   * @param nodesRequired - Nodes Required
+   * @param healthyNodes - List of Nodes in the result set.
+   * @return List of Datanodes that can be used for placement.
+   * @throws SCMException
+   */
+  public List<DatanodeDetails> getResultSet(
+      int nodesRequired, List<DatanodeDetails> healthyNodes)
+      throws SCMException {
+    List<DatanodeDetails> results = new LinkedList<>();
+    for (int x = 0; x < nodesRequired; x++) {
+      // invoke the choose function defined in the derived classes.
+      DatanodeDetails nodeId = chooseNode(healthyNodes);
+      if (nodeId != null) {
+        results.add(nodeId);
+      }
+    }
+
+    if (results.size() < nodesRequired) {
+      LOG.error("Unable to find the required number of healthy nodes that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return results;
+  }
+
+  /**
+   * Choose a datanode according to the policy, this function is implemented
+   * by the actual policy class. For example, PlacementCapacity or
+   * PlacementRandom.
+   *
+   * @param healthyNodes - Set of healthy nodes we can choose from.
+   * @return DatanodeDetails
+   */
+  public abstract DatanodeDetails chooseNode(
+      List<DatanodeDetails> healthyNodes);
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
new file mode 100644
index 0000000..85a6b54
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Container placement policy that randomly choose datanodes with remaining
+ * space to satisfy the size constraints.
+ * <p>
+ * The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes
+ * and then pick the node which lower utilization. This leads to a higher
+ * probability of nodes with lower utilization to be picked.
+ * <p>
+ * For those wondering why we choose two nodes randomly and choose the node
+ * with lower utilization. There are links to this original papers in
+ * HDFS-11564.
+ * <p>
+ * A brief summary -- We treat the nodes from a scale of lowest utilized to
+ * highest utilized, there are (s * ( s + 1)) / 2 possibilities to build
+ * distinct pairs of nodes.  There are s - k pairs of nodes in which the rank
+ * k node is less than the couple. So probability of a picking a node is
+ * (2 * (s -k)) / (s * (s - 1)).
+ * <p>
+ * In English, There is a much higher probability of picking less utilized 
nodes
+ * as compared to nodes with higher utilization since we pick 2 nodes and
+ * then pick the node with lower utilization.
+ * <p>
+ * This avoids the issue of users adding new nodes into the cluster and HDFS
+ * sending all traffic to those nodes if we only use a capacity based
+ * allocation scheme. Unless those nodes are part of the set of the first 2
+ * nodes then newer nodes will not be in the running to get the container.
+ * <p>
+ * This leads to an I/O pattern where the lower utilized nodes are favoured
+ * more than higher utilized nodes, but part of the I/O will still go to the
+ * older higher utilized nodes.
+ * <p>
+ * With this algorithm in place, our hope is that balancer tool needs to do
+ * little or no work and the cluster will achieve a balanced distribution
+ * over time.
+ */
+public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
+
+  /**
+   * Constructs a Container Placement with considering only capacity.
+   * That is this policy tries to place containers based on node weight.
+   *
+   * @param nodeManager Node Manager
+   * @param conf Configuration
+   */
+  public SCMContainerPlacementCapacity(final NodeManager nodeManager,
+      final Configuration conf) {
+    super(nodeManager, conf);
+  }
+
+  /**
+   * Called by SCM to choose datanodes.
+   *
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return List of datanodes.
+   * @throws SCMException  SCMException
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      final int nodesRequired, final long sizeRequired) throws SCMException {
+    List<DatanodeDetails> healthyNodes =
+        super.chooseDatanodes(nodesRequired, sizeRequired);
+    if (healthyNodes.size() == nodesRequired) {
+      return healthyNodes;
+    }
+    return getResultSet(nodesRequired, healthyNodes);
+  }
+
+  /**
+   * Find a node from the healthy list and return it after removing it from the
+   * list that we are operating on.
+   *
+   * @param healthyNodes - List of healthy nodes that meet the size
+   * requirement.
+   * @return DatanodeDetails that is chosen.
+   */
+  @Override
+  public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+    int firstNodeNdx = getRand().nextInt(healthyNodes.size());
+    int secondNodeNdx = getRand().nextInt(healthyNodes.size());
+
+    DatanodeDetails datanodeDetails;
+    // There is a possibility that both numbers will be same.
+    // if that is so, we just return the node.
+    if (firstNodeNdx == secondNodeNdx) {
+      datanodeDetails = healthyNodes.get(firstNodeNdx);
+    } else {
+      DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
+      DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
+      SCMNodeMetric firstNodeMetric =
+          getNodeManager().getNodeStat(firstNodeDetails);
+      SCMNodeMetric secondNodeMetric =
+          getNodeManager().getNodeStat(secondNodeDetails);
+      datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
+          ? firstNodeDetails : secondNodeDetails;
+    }
+    healthyNodes.remove(datanodeDetails);
+    return datanodeDetails;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
new file mode 100644
index 0000000..9903c84
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Container placement policy that randomly chooses healthy datanodes.
+ * This is very similar to current HDFS placement. That is we
+ * just randomly place containers without any considerations of utilization.
+ * <p>
+ * That means we rely on balancer to achieve even distribution of data.
+ * Balancer will need to support containers as a feature before this class
+ * can be practically used.
+ */
+public final class SCMContainerPlacementRandom extends SCMCommonPolicy
+    implements ContainerPlacementPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
+
+  /**
+   * Construct a random Block Placement policy.
+   *
+   * @param nodeManager nodeManager
+   * @param conf Config
+   */
+  public SCMContainerPlacementRandom(final NodeManager nodeManager,
+      final Configuration conf) {
+    super(nodeManager, conf);
+  }
+
+  /**
+   * Choose datanodes called by the SCM to choose the datanode.
+   *
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired - size required for the container or block.
+   * @return List of Datanodes.
+   * @throws SCMException  SCMException
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      final int nodesRequired, final long sizeRequired) throws SCMException {
+    List<DatanodeDetails> healthyNodes =
+        super.chooseDatanodes(nodesRequired, sizeRequired);
+
+    if (healthyNodes.size() == nodesRequired) {
+      return healthyNodes;
+    }
+    return getResultSet(nodesRequired, healthyNodes);
+  }
+
+  /**
+   * Just chose a node randomly and remove it from the set of nodes we can
+   * chose from.
+   *
+   * @param healthyNodes - all healthy datanodes.
+   * @return one randomly chosen datanode that from two randomly chosen 
datanode
+   */
+  public DatanodeDetails chooseNode(final List<DatanodeDetails> healthyNodes) {
+    DatanodeDetails selectedNode =
+        healthyNodes.get(getRand().nextInt(healthyNodes.size()));
+    healthyNodes.remove(selectedNode);
+    return selectedNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
new file mode 100644
index 0000000..1cb810d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+// Various placement algorithms.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java
new file mode 100644
index 0000000..b8e8998
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/ContainerStat.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.web.utils.JsonUtils;
+
+import java.io.IOException;
+
+/**
+ * This class represents the SCM container stat.
+ */
+public class ContainerStat {
+  /**
+   * The maximum container size.
+   */
+  @JsonProperty("Size")
+  private LongMetric size;
+
+  /**
+   * The number of bytes used by the container.
+   */
+  @JsonProperty("Used")
+  private LongMetric used;
+
+  /**
+   * The number of keys in the container.
+   */
+  @JsonProperty("KeyCount")
+  private LongMetric keyCount;
+
+  /**
+   * The number of bytes read from the container.
+   */
+  @JsonProperty("ReadBytes")
+  private LongMetric readBytes;
+
+  /**
+   * The number of bytes write into the container.
+   */
+  @JsonProperty("WriteBytes")
+  private LongMetric writeBytes;
+
+  /**
+   * The number of times the container is read.
+   */
+  @JsonProperty("ReadCount")
+  private LongMetric readCount;
+
+  /**
+   * The number of times the container is written into.
+   */
+  @JsonProperty("WriteCount")
+  private LongMetric writeCount;
+
+  public ContainerStat() {
+    this(0L, 0L, 0L, 0L, 0L, 0L, 0L);
+  }
+
+  public ContainerStat(long size, long used, long keyCount, long readBytes,
+      long writeBytes, long readCount, long writeCount) {
+    Preconditions.checkArgument(size >= 0,
+        "Container size cannot be " + "negative.");
+    Preconditions.checkArgument(used >= 0,
+        "Used space cannot be " + "negative.");
+    Preconditions.checkArgument(keyCount >= 0,
+        "Key count cannot be " + "negative");
+    Preconditions.checkArgument(readBytes >= 0,
+        "Read bytes read cannot be " + "negative.");
+    Preconditions.checkArgument(readBytes >= 0,
+        "Write bytes cannot be " + "negative.");
+    Preconditions.checkArgument(readCount >= 0,
+        "Read count cannot be " + "negative.");
+    Preconditions.checkArgument(writeCount >= 0,
+        "Write count cannot be " + "negative");
+
+    this.size = new LongMetric(size);
+    this.used = new LongMetric(used);
+    this.keyCount = new LongMetric(keyCount);
+    this.readBytes = new LongMetric(readBytes);
+    this.writeBytes = new LongMetric(writeBytes);
+    this.readCount = new LongMetric(readCount);
+    this.writeCount = new LongMetric(writeCount);
+  }
+
+  public LongMetric getSize() {
+    return size;
+  }
+
+  public LongMetric getUsed() {
+    return used;
+  }
+
+  public LongMetric getKeyCount() {
+    return keyCount;
+  }
+
+  public LongMetric getReadBytes() {
+    return readBytes;
+  }
+
+  public LongMetric getWriteBytes() {
+    return writeBytes;
+  }
+
+  public LongMetric getReadCount() {
+    return readCount;
+  }
+
+  public LongMetric getWriteCount() {
+    return writeCount;
+  }
+
+  public void add(ContainerStat stat) {
+    if (stat == null) {
+      return;
+    }
+
+    this.size.add(stat.getSize().get());
+    this.used.add(stat.getUsed().get());
+    this.keyCount.add(stat.getKeyCount().get());
+    this.readBytes.add(stat.getReadBytes().get());
+    this.writeBytes.add(stat.getWriteBytes().get());
+    this.readCount.add(stat.getReadCount().get());
+    this.writeCount.add(stat.getWriteCount().get());
+  }
+
+  public void subtract(ContainerStat stat) {
+    if (stat == null) {
+      return;
+    }
+
+    this.size.subtract(stat.getSize().get());
+    this.used.subtract(stat.getUsed().get());
+    this.keyCount.subtract(stat.getKeyCount().get());
+    this.readBytes.subtract(stat.getReadBytes().get());
+    this.writeBytes.subtract(stat.getWriteBytes().get());
+    this.readCount.subtract(stat.getReadCount().get());
+    this.writeCount.subtract(stat.getWriteCount().get());
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.toJsonString(this);
+    } catch (IOException ignored) {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java
new file mode 100644
index 0000000..a6e732c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/DatanodeMetric.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+
+/**
+ * DatanodeMetric acts as the basis for all the metric that is used in
+ * comparing 2 datanodes.
+ */
+public interface DatanodeMetric<T, S> extends Comparable<T> {
+
+  /**
+   * Some syntactic sugar over Comparable interface. This makes code easier to
+   * read.
+   *
+   * @param o - Other Object
+   * @return - True if *this* object is greater than argument.
+   */
+  boolean isGreater(T o);
+
+  /**
+   * Inverse of isGreater.
+   *
+   * @param o - other object.
+   * @return True if *this* object is Lesser than argument.
+   */
+  boolean isLess(T o);
+
+  /**
+   * Returns true if the object has same values. Because of issues with
+   * equals, and loss of type information this interface supports isEqual.
+   *
+   * @param o object to compare.
+   * @return True, if the values match.
+   */
+  boolean isEqual(T o);
+
+  /**
+   * A resourceCheck, defined by resourceNeeded.
+   * For example, S could be bytes required
+   * and DatanodeMetric can reply by saying it can be met or not.
+   *
+   * @param resourceNeeded -  ResourceNeeded in its own metric.
+   * @return boolean, True if this resource requirement can be met.
+   */
+  boolean hasResources(S resourceNeeded) throws SCMException;
+
+  /**
+   * Returns the metric.
+   *
+   * @return T, the object that represents this metric.
+   */
+  T get();
+
+  /**
+   * Sets the value of this metric.
+   *
+   * @param value - value of the metric.
+   */
+  void set(T value);
+
+  /**
+   * Adds a value of to the base.
+   * @param value - value
+   */
+  void add(T value);
+
+  /**
+   * subtract a value.
+   * @param value value
+   */
+  void subtract(T value);
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java
new file mode 100644
index 0000000..050d26b
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/LongMetric.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
+/**
+ * An helper class for all metrics based on Longs.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+public class LongMetric implements DatanodeMetric<Long, Long> {
+  private Long value;
+
+  /**
+   * Constructs a long Metric.
+   *
+   * @param value Value for this metric.
+   */
+  public LongMetric(Long value) {
+    this.value = value;
+  }
+
+  /**
+   * Some syntactic sugar over Comparable interface. This makes code easier to
+   * read.
+   *
+   * @param o - Other Object
+   * @return - True if *this* object is greater than argument.
+   */
+  @Override
+  public boolean isGreater(Long o) {
+    return compareTo(o) > 0;
+  }
+
+  /**
+   * Inverse of isGreater.
+   *
+   * @param o - other object.
+   * @return True if *this* object is Lesser than argument.
+   */
+  @Override
+  public boolean isLess(Long o) {
+    return compareTo(o) < 0;
+  }
+
+  /**
+   * Returns true if the object has same values. Because of issues with
+   * equals, and loss of type information this interface supports isEqual.
+   *
+   * @param o object to compare.
+   * @return True, if the values match.
+   */
+  @Override
+  public boolean isEqual(Long o) {
+    return compareTo(o) == 0;
+  }
+
+  /**
+   * A resourceCheck, defined by resourceNeeded.
+   * For example, S could be bytes required
+   * and DatanodeMetric can reply by saying it can be met or not.
+   *
+   * @param resourceNeeded -  ResourceNeeded in its own metric.
+   * @return boolean, True if this resource requirement can be met.
+   */
+  @Override
+  public boolean hasResources(Long resourceNeeded) {
+    return isGreater(resourceNeeded);
+  }
+
+  /**
+   * Returns the metric.
+   *
+   * @return T, the object that represents this metric.
+   */
+  @Override
+  public Long get() {
+    return this.value;
+  }
+
+  /**
+   * Sets the value of this metric.
+   *
+   * @param setValue - value of the metric.
+   */
+  @Override
+  public void set(Long setValue) {
+    this.value = setValue;
+
+  }
+
+  /**
+   * Adds a value of to the base.
+   *
+   * @param addValue - value
+   */
+  @Override
+  public void add(Long addValue) {
+    this.value += addValue;
+  }
+
+  /**
+   * subtract a value.
+   *
+   * @param subValue value
+   */
+  @Override
+  public void subtract(Long subValue) {
+    this.value -= subValue;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(Long o) {
+    return Long.compare(this.value, o);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    LongMetric that = (LongMetric) o;
+
+    return value != null ? value.equals(that.value) : that.value == null;
+  }
+
+  @Override
+  public int hashCode() {
+    return value != null ? value.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java
new file mode 100644
index 0000000..d6857d3
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/NodeStat.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Interface that defines Node Stats.
+ */
+interface NodeStat {
+  /**
+   * Get capacity of the node.
+   * @return capacity of the node.
+   */
+  LongMetric getCapacity();
+
+  /**
+   * Get the used space of the node.
+   * @return the used space of the node.
+   */
+  LongMetric getScmUsed();
+
+  /**
+   * Get the remaining space of the node.
+   * @return the remaining space of the node.
+   */
+  LongMetric getRemaining();
+
+  /**
+   * Set the total/used/remaining space.
+   * @param capacity - total space.
+   * @param used - used space.
+   * @param remain - remaining space.
+   */
+  @VisibleForTesting
+  void set(long capacity, long used, long remain);
+
+  /**
+   * Adding of the stat.
+   * @param stat - stat to be added.
+   * @return updated node stat.
+   */
+  NodeStat add(NodeStat stat);
+
+  /**
+   * Subtract of the stat.
+   * @param stat - stat to be subtracted.
+   * @return updated nodestat.
+   */
+  NodeStat subtract(NodeStat stat);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java
new file mode 100644
index 0000000..e4dd9aa
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMMetrics.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * This class is for maintaining StorageContainerManager statistics.
+ */
+@Metrics(about="Storage Container Manager Metrics", context="dfs")
+public class SCMMetrics {
+  public static final String SOURCE_NAME =
+      SCMMetrics.class.getSimpleName();
+
+  /**
+   * Container stat metrics, the meaning of following metrics
+   * can be found in {@link ContainerStat}.
+   */
+  @Metric private MutableGaugeLong lastContainerReportSize;
+  @Metric private MutableGaugeLong lastContainerReportUsed;
+  @Metric private MutableGaugeLong lastContainerReportKeyCount;
+  @Metric private MutableGaugeLong lastContainerReportReadBytes;
+  @Metric private MutableGaugeLong lastContainerReportWriteBytes;
+  @Metric private MutableGaugeLong lastContainerReportReadCount;
+  @Metric private MutableGaugeLong lastContainerReportWriteCount;
+
+  @Metric private MutableCounterLong containerReportSize;
+  @Metric private MutableCounterLong containerReportUsed;
+  @Metric private MutableCounterLong containerReportKeyCount;
+  @Metric private MutableCounterLong containerReportReadBytes;
+  @Metric private MutableCounterLong containerReportWriteBytes;
+  @Metric private MutableCounterLong containerReportReadCount;
+  @Metric private MutableCounterLong containerReportWriteCount;
+
+  public SCMMetrics() {
+  }
+
+  public static SCMMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, "Storage Container Manager Metrics",
+        new SCMMetrics());
+  }
+
+  public void setLastContainerReportSize(long size) {
+    this.lastContainerReportSize.set(size);
+  }
+
+  public void setLastContainerReportUsed(long used) {
+    this.lastContainerReportUsed.set(used);
+  }
+
+  public void setLastContainerReportKeyCount(long keyCount) {
+    this.lastContainerReportKeyCount.set(keyCount);
+  }
+
+  public void setLastContainerReportReadBytes(long readBytes) {
+    this.lastContainerReportReadBytes.set(readBytes);
+  }
+
+  public void setLastContainerReportWriteBytes(long writeBytes) {
+    this.lastContainerReportWriteBytes.set(writeBytes);
+  }
+
+  public void setLastContainerReportReadCount(long readCount) {
+    this.lastContainerReportReadCount.set(readCount);
+  }
+
+  public void setLastContainerReportWriteCount(long writeCount) {
+    this.lastContainerReportWriteCount.set(writeCount);
+  }
+
+  public void incrContainerReportSize(long size) {
+    this.containerReportSize.incr(size);
+  }
+
+  public void incrContainerReportUsed(long used) {
+    this.containerReportUsed.incr(used);
+  }
+
+  public void incrContainerReportKeyCount(long keyCount) {
+    this.containerReportKeyCount.incr(keyCount);
+  }
+
+  public void incrContainerReportReadBytes(long readBytes) {
+    this.containerReportReadBytes.incr(readBytes);
+  }
+
+  public void incrContainerReportWriteBytes(long writeBytes) {
+    this.containerReportWriteBytes.incr(writeBytes);
+  }
+
+  public void incrContainerReportReadCount(long readCount) {
+    this.containerReportReadCount.incr(readCount);
+  }
+
+  public void incrContainerReportWriteCount(long writeCount) {
+    this.containerReportWriteCount.incr(writeCount);
+  }
+
+  public void setLastContainerStat(ContainerStat newStat) {
+    this.lastContainerReportSize.set(newStat.getSize().get());
+    this.lastContainerReportUsed.set(newStat.getUsed().get());
+    this.lastContainerReportKeyCount.set(newStat.getKeyCount().get());
+    this.lastContainerReportReadBytes.set(newStat.getReadBytes().get());
+    this.lastContainerReportWriteBytes.set(newStat.getWriteBytes().get());
+    this.lastContainerReportReadCount.set(newStat.getReadCount().get());
+    this.lastContainerReportWriteCount.set(newStat.getWriteCount().get());
+  }
+
+  public void incrContainerStat(ContainerStat deltaStat) {
+    this.containerReportSize.incr(deltaStat.getSize().get());
+    this.containerReportUsed.incr(deltaStat.getUsed().get());
+    this.containerReportKeyCount.incr(deltaStat.getKeyCount().get());
+    this.containerReportReadBytes.incr(deltaStat.getReadBytes().get());
+    this.containerReportWriteBytes.incr(deltaStat.getWriteBytes().get());
+    this.containerReportReadCount.incr(deltaStat.getReadCount().get());
+    this.containerReportWriteCount.incr(deltaStat.getWriteCount().get());
+  }
+
+  public void decrContainerStat(ContainerStat deltaStat) {
+    this.containerReportSize.incr(-1 * deltaStat.getSize().get());
+    this.containerReportUsed.incr(-1 * deltaStat.getUsed().get());
+    this.containerReportKeyCount.incr(-1 * deltaStat.getKeyCount().get());
+    this.containerReportReadBytes.incr(-1 * deltaStat.getReadBytes().get());
+    this.containerReportWriteBytes.incr(-1 * deltaStat.getWriteBytes().get());
+    this.containerReportReadCount.incr(-1 * deltaStat.getReadCount().get());
+    this.containerReportWriteCount.incr(-1 * deltaStat.getWriteCount().get());
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
new file mode 100644
index 0000000..b50376d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeMetric.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * SCM Node Metric that is used in the placement classes.
+ */
+public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> {
+  private SCMNodeStat stat;
+
+  /**
+   * Constructs an SCMNode Metric.
+   *
+   * @param stat - SCMNodeStat.
+   */
+  public SCMNodeMetric(SCMNodeStat stat) {
+    this.stat = stat;
+  }
+
+  /**
+   * Set the capacity, used and remaining space on a datanode.
+   *
+   * @param capacity in bytes
+   * @param used in bytes
+   * @param remaining in bytes
+   */
+  @VisibleForTesting
+  public SCMNodeMetric(long capacity, long used, long remaining) {
+    this.stat = new SCMNodeStat();
+    this.stat.set(capacity, used, remaining);
+  }
+
+  /**
+   *
+   * @param o - Other Object
+   * @return - True if *this* object is greater than argument.
+   */
+  @Override
+  public boolean isGreater(SCMNodeStat o) {
+    Preconditions.checkNotNull(o, "Argument cannot be null");
+
+    // if zero, replace with 1 for the division to work.
+    long thisDenominator = (this.stat.getCapacity().get() == 0)
+        ? 1 : this.stat.getCapacity().get();
+    long otherDenominator = (o.getCapacity().get() == 0)
+        ? 1 : o.getCapacity().get();
+
+    float thisNodeWeight =
+        stat.getScmUsed().get() / (float) thisDenominator;
+
+    float oNodeWeight =
+        o.getScmUsed().get() / (float) otherDenominator;
+
+    if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) {
+      return thisNodeWeight > oNodeWeight;
+    }
+    // if these nodes are have similar weight then return the node with more
+    // free space as the greater node.
+    return stat.getRemaining().isGreater(o.getRemaining().get());
+  }
+
+  /**
+   * Inverse of isGreater.
+   *
+   * @param o - other object.
+   * @return True if *this* object is Lesser than argument.
+   */
+  @Override
+  public boolean isLess(SCMNodeStat o) {
+    Preconditions.checkNotNull(o, "Argument cannot be null");
+
+    // if zero, replace with 1 for the division to work.
+    long thisDenominator = (this.stat.getCapacity().get() == 0)
+        ? 1 : this.stat.getCapacity().get();
+    long otherDenominator = (o.getCapacity().get() == 0)
+        ? 1 : o.getCapacity().get();
+
+    float thisNodeWeight =
+        stat.getScmUsed().get() / (float) thisDenominator;
+
+    float oNodeWeight =
+        o.getScmUsed().get() / (float) otherDenominator;
+
+    if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) {
+      return thisNodeWeight < oNodeWeight;
+    }
+
+    // if these nodes are have similar weight then return the node with less
+    // free space as the lesser node.
+    return stat.getRemaining().isLess(o.getRemaining().get());
+  }
+
+  /**
+   * Returns true if the object has same values. Because of issues with
+   * equals, and loss of type information this interface supports isEqual.
+   *
+   * @param o object to compare.
+   * @return True, if the values match.
+   * TODO : Consider if it makes sense to add remaining to this equation.
+   */
+  @Override
+  public boolean isEqual(SCMNodeStat o) {
+    float thisNodeWeight = stat.getScmUsed().get() / (float)
+        stat.getCapacity().get();
+    float oNodeWeight = o.getScmUsed().get() / (float) o.getCapacity().get();
+    return Math.abs(thisNodeWeight - oNodeWeight) < 0.000001;
+  }
+
+  /**
+   * A resourceCheck, defined by resourceNeeded.
+   * For example, S could be bytes required
+   * and DatanodeMetric can reply by saying it can be met or not.
+   *
+   * @param resourceNeeded -  ResourceNeeded in its own metric.
+   * @return boolean, True if this resource requirement can be met.
+   */
+  @Override
+  public boolean hasResources(Long resourceNeeded) {
+    return false;
+  }
+
+  /**
+   * Returns the metric.
+   *
+   * @return T, the object that represents this metric.
+   */
+  @Override
+  public SCMNodeStat get() {
+    return stat;
+  }
+
+  /**
+   * Sets the value of this metric.
+   *
+   * @param value - value of the metric.
+   */
+  @Override
+  public void set(SCMNodeStat value) {
+    stat.set(value.getCapacity().get(), value.getScmUsed().get(),
+        value.getRemaining().get());
+  }
+
+  /**
+   * Adds a value of to the base.
+   *
+   * @param value - value
+   */
+  @Override
+  public void add(SCMNodeStat value) {
+    stat.add(value);
+  }
+
+  /**
+   * subtract a value.
+   *
+   * @param value value
+   */
+  @Override
+  public void subtract(SCMNodeStat value) {
+    stat.subtract(value);
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(SCMNodeStat o) {
+    if (isEqual(o)) {
+      return 0;
+    }
+    if (isGreater(o)) {
+      return 1;
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SCMNodeMetric that = (SCMNodeMetric) o;
+
+    return stat != null ? stat.equals(that.stat) : that.stat == null;
+  }
+
+  @Override
+  public int hashCode() {
+    return stat != null ? stat.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
new file mode 100644
index 0000000..3c871d3
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents the SCM node stat.
+ */
+public class SCMNodeStat implements NodeStat {
+  private LongMetric capacity;
+  private LongMetric scmUsed;
+  private LongMetric remaining;
+
+  public SCMNodeStat() {
+    this(0L, 0L, 0L);
+  }
+
+  public SCMNodeStat(SCMNodeStat other) {
+    this(other.capacity.get(), other.scmUsed.get(), other.remaining.get());
+  }
+
+  public SCMNodeStat(long capacity, long used, long remaining) {
+    Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " +
+        "negative.");
+    Preconditions.checkArgument(used >= 0, "used space cannot be " +
+        "negative.");
+    Preconditions.checkArgument(remaining >= 0, "remaining cannot be " +
+        "negative");
+    this.capacity = new LongMetric(capacity);
+    this.scmUsed = new LongMetric(used);
+    this.remaining = new LongMetric(remaining);
+  }
+
+  /**
+   * @return the total configured capacity of the node.
+   */
+  public LongMetric getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * @return the total SCM used space on the node.
+   */
+  public LongMetric getScmUsed() {
+    return scmUsed;
+  }
+
+  /**
+   * @return the total remaining space available on the node.
+   */
+  public LongMetric getRemaining() {
+    return remaining;
+  }
+
+  /**
+   * Set the capacity, used and remaining space on a datanode.
+   *
+   * @param newCapacity in bytes
+   * @param newUsed in bytes
+   * @param newRemaining in bytes
+   */
+  @VisibleForTesting
+  public void set(long newCapacity, long newUsed, long newRemaining) {
+    Preconditions.checkNotNull(newCapacity, "Capacity cannot be null");
+    Preconditions.checkNotNull(newUsed, "used cannot be null");
+    Preconditions.checkNotNull(newRemaining, "remaining cannot be null");
+
+    Preconditions.checkArgument(newCapacity >= 0, "Capacity cannot be " +
+        "negative.");
+    Preconditions.checkArgument(newUsed >= 0, "used space cannot be " +
+        "negative.");
+    Preconditions.checkArgument(newRemaining >= 0, "remaining cannot be " +
+        "negative");
+
+    this.capacity = new LongMetric(newCapacity);
+    this.scmUsed = new LongMetric(newUsed);
+    this.remaining = new LongMetric(newRemaining);
+  }
+
+  /**
+   * Adds a new nodestat to existing values of the node.
+   *
+   * @param stat Nodestat.
+   * @return SCMNodeStat
+   */
+  public SCMNodeStat add(NodeStat stat) {
+    this.capacity.set(this.getCapacity().get() + stat.getCapacity().get());
+    this.scmUsed.set(this.getScmUsed().get() + stat.getScmUsed().get());
+    this.remaining.set(this.getRemaining().get() + stat.getRemaining().get());
+    return this;
+  }
+
+  /**
+   * Subtracts the stat values from the existing NodeStat.
+   *
+   * @param stat SCMNodeStat.
+   * @return Modified SCMNodeStat
+   */
+  public SCMNodeStat subtract(NodeStat stat) {
+    this.capacity.set(this.getCapacity().get() - stat.getCapacity().get());
+    this.scmUsed.set(this.getScmUsed().get() - stat.getScmUsed().get());
+    this.remaining.set(this.getRemaining().get() - stat.getRemaining().get());
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object to) {
+    if (to instanceof SCMNodeStat) {
+      SCMNodeStat tempStat = (SCMNodeStat) to;
+      return capacity.isEqual(tempStat.getCapacity().get()) &&
+          scmUsed.isEqual(tempStat.getScmUsed().get()) &&
+          remaining.isEqual(tempStat.getRemaining().get());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java
new file mode 100644
index 0000000..4a81d69
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.metrics;
+
+// Various metrics supported by Datanode and used by SCM in the placement
+// strategy.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java
new file mode 100644
index 0000000..dc54d9b
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.placement;
+// Classes related to container placement.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
new file mode 100644
index 0000000..52321ee
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerSupervisor implements Closeable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ContainerSupervisor.class);
+
+  private final NodePoolManager poolManager;
+  private final HashSet<String> poolNames;
+  private final PriorityQueue<PeriodicPool> poolQueue;
+  private final NodeManager nodeManager;
+  private final long containerProcessingLag;
+  private final AtomicBoolean runnable;
+  private final ExecutorService executorService;
+  private final long maxPoolWait;
+  private long poolProcessCount;
+  private final List<InProgressPool> inProgressPoolList;
+  private final AtomicInteger threadFaultCount;
+  private final int inProgressPoolMaxCount;
+
+  private final ReadWriteLock inProgressPoolListLock;
+
+  /**
+   * Returns the number of times we have processed pools.
+   * @return long
+   */
+  public long getPoolProcessCount() {
+    return poolProcessCount;
+  }
+
+
+  /**
+   * Constructs a class that computes Replication Levels.
+   *
+   * @param conf - OzoneConfiguration
+   * @param nodeManager - Node Manager
+   * @param poolManager - Pool Manager
+   */
+  public ContainerSupervisor(Configuration conf, NodeManager nodeManager,
+                             NodePoolManager poolManager) {
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(nodeManager);
+    this.containerProcessingLag =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL,
+            OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT,
+            TimeUnit.SECONDS
+        ) * 1000;
+    int maxContainerReportThreads =
+        conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+            OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+        );
+    this.maxPoolWait =
+        conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT,
+            OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    this.inProgressPoolMaxCount = conf.getInt(
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS,
+        OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT);
+    this.poolManager = poolManager;
+    this.nodeManager = nodeManager;
+    this.poolNames = new HashSet<>();
+    this.poolQueue = new PriorityQueue<>();
+    this.runnable = new AtomicBoolean(true);
+    this.threadFaultCount = new AtomicInteger(0);
+    this.executorService = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Container Reports Processing Thread - %d")
+            .build(), maxContainerReportThreads);
+    this.inProgressPoolList = new LinkedList<>();
+    this.inProgressPoolListLock = new ReentrantReadWriteLock();
+
+    initPoolProcessThread();
+  }
+
+  /**
+   * Returns the number of pools that are under process right now.
+   * @return  int - Number of pools that are in process.
+   */
+  public int getInProgressPoolCount() {
+    return inProgressPoolList.size();
+  }
+
+  /**
+   * Exits the background thread.
+   */
+  public void setExit() {
+    this.runnable.set(false);
+  }
+
+  /**
+   * Adds or removes pools from names that we need to process.
+   *
+   * There are two different cases that we need to process.
+   * The case where some pools are being added and some times we have to
+   * handle cases where pools are removed.
+   */
+  private void refreshPools() {
+    List<String> pools = this.poolManager.getNodePools();
+    if (pools != null) {
+
+      HashSet<String> removedPools =
+          computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+      HashSet<String> addedPools =
+          computePoolDifference(new HashSet<>(pools), this.poolNames);
+      // TODO: Support remove pool API in pool manager so that this code
+      // path can be tested. This never happens in the current code base.
+      for (String poolName : removedPools) {
+        for (PeriodicPool periodicPool : poolQueue) {
+          if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+            poolQueue.remove(periodicPool);
+          }
+        }
+      }
+      // Remove the pool names that we have in the list.
+      this.poolNames.removeAll(removedPools);
+
+      for (String poolName : addedPools) {
+        poolQueue.add(new PeriodicPool(poolName));
+      }
+
+      // Add to the pool names we are tracking.
+      poolNames.addAll(addedPools);
+    }
+
+  }
+
+  /**
+   * Handle the case where pools are added.
+   *
+   * @param newPools - New Pools list
+   * @param oldPool - oldPool List.
+   */
+  private HashSet<String> computePoolDifference(HashSet<String> newPools,
+      Set<String> oldPool) {
+    Preconditions.checkNotNull(newPools);
+    Preconditions.checkNotNull(oldPool);
+    HashSet<String> newSet = new HashSet<>(newPools);
+    newSet.removeAll(oldPool);
+    return newSet;
+  }
+
+  private void initPoolProcessThread() {
+
+    /*
+     * Task that runs to check if we need to start a pool processing job.
+     * if so we create a pool reconciliation job and find out of all the
+     * expected containers are on the nodes.
+     */
+    Runnable processPools = () -> {
+      while (runnable.get()) {
+        // Make sure that we don't have any new pools.
+        refreshPools();
+        while (inProgressPoolList.size() < inProgressPoolMaxCount) {
+          PeriodicPool pool = poolQueue.poll();
+          if (pool != null) {
+            if (pool.getLastProcessedTime() + this.containerProcessingLag >
+                Time.monotonicNow()) {
+              LOG.debug("Not within the time window for processing: {}",
+                  pool.getPoolName());
+              // we might over sleep here, not a big deal.
+              sleepUninterruptibly(this.containerProcessingLag,
+                  TimeUnit.MILLISECONDS);
+            }
+            LOG.debug("Adding pool {} to container processing queue",
+                pool.getPoolName());
+            InProgressPool inProgressPool = new InProgressPool(maxPoolWait,
+                pool, this.nodeManager, this.poolManager, 
this.executorService);
+            inProgressPool.startReconciliation();
+            inProgressPoolListLock.writeLock().lock();
+            try {
+              inProgressPoolList.add(inProgressPool);
+            } finally {
+              inProgressPoolListLock.writeLock().unlock();
+            }
+            poolProcessCount++;
+          } else {
+            break;
+          }
+        }
+        sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+        inProgressPoolListLock.readLock().lock();
+        try {
+          for (InProgressPool inProgressPool : inProgressPoolList) {
+            inProgressPool.finalizeReconciliation();
+            poolQueue.add(inProgressPool.getPool());
+          }
+        } finally {
+          inProgressPoolListLock.readLock().unlock();
+        }
+        inProgressPoolListLock.writeLock().lock();
+        try {
+          inProgressPoolList.clear();
+        } finally {
+          inProgressPoolListLock.writeLock().unlock();
+        }
+      }
+    };
+
+    // We will have only one thread for pool processing.
+    Thread poolProcessThread = new Thread(processPools);
+    poolProcessThread.setDaemon(true);
+    poolProcessThread.setName("Pool replica thread");
+    poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Pool replica thread encountered an " +
+          "error. Thread: {} Error Count : {}", t.toString(), e,
+          threadFaultCount.incrementAndGet());
+      poolProcessThread.start();
+      // TODO : Add a config to restrict how many times we will restart this
+      // thread in a single session.
+    });
+    poolProcessThread.start();
+  }
+
+  /**
+   * Adds a container report to appropriate inProgress Pool.
+   * @param containerReport  -- Container report for a specific container from
+   * a datanode.
+   */
+  public void handleContainerReport(
+      ContainerReportsRequestProto containerReport) {
+    DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(
+        containerReport.getDatanodeDetails());
+    inProgressPoolListLock.readLock().lock();
+    try {
+      String poolName = poolManager.getNodePool(datanodeDetails);
+      for (InProgressPool ppool : inProgressPoolList) {
+        if (ppool.getPoolName().equalsIgnoreCase(poolName)) {
+          ppool.handleContainerReport(containerReport);
+          return;
+        }
+      }
+      // TODO: Decide if we can do anything else with this report.
+      LOG.debug("Discarding the container report for pool {}. " +
+              "That pool is not currently in the pool reconciliation process." 
+
+              " Container Name: {}", poolName,
+          containerReport.getDatanodeDetails());
+    } catch (SCMException e) {
+      LOG.warn("Skipping processing container report from datanode {}, "
+              + "cause: failed to get the corresponding node pool",
+          datanodeDetails.toString(), e);
+    } finally {
+      inProgressPoolListLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get in process pool list, used for testing.
+   * @return List of InProgressPool
+   */
+  @VisibleForTesting
+  public List<InProgressPool> getInProcessPoolList() {
+    return inProgressPoolList;
+  }
+
+  /**
+   * Shutdown the Container Replication Manager.
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    setExit();
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
new file mode 100644
index 0000000..ddbd213
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodePoolManager;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .HEALTHY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
+    .INVALID;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+
+/**
+ * These are pools that are actively checking for replication status of the
+ * containers.
+ */
+public final class InProgressPool {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(InProgressPool.class);
+
+  private final PeriodicPool pool;
+  private final NodeManager nodeManager;
+  private final NodePoolManager poolManager;
+  private final ExecutorService executorService;
+  private final Map<String, Integer> containerCountMap;
+  private final Map<UUID, Boolean> processedNodeSet;
+  private final long startTime;
+  private ProgressStatus status;
+  private AtomicInteger nodeCount;
+  private AtomicInteger nodeProcessed;
+  private AtomicInteger containerProcessedCount;
+  private long maxWaitTime;
+  /**
+   * Constructs an pool that is being processed.
+   *  @param maxWaitTime - Maximum wait time in milliseconds.
+   * @param pool - Pool that we are working against
+   * @param nodeManager - Nodemanager
+   * @param poolManager - pool manager
+   * @param executorService - Shared Executor service.
+   */
+  InProgressPool(long maxWaitTime, PeriodicPool pool,
+      NodeManager nodeManager, NodePoolManager poolManager,
+                 ExecutorService executorService) {
+    Preconditions.checkNotNull(pool);
+    Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(executorService);
+    Preconditions.checkArgument(maxWaitTime > 0);
+    this.pool = pool;
+    this.nodeManager = nodeManager;
+    this.poolManager = poolManager;
+    this.executorService = executorService;
+    this.containerCountMap = new ConcurrentHashMap<>();
+    this.processedNodeSet = new ConcurrentHashMap<>();
+    this.maxWaitTime = maxWaitTime;
+    startTime = Time.monotonicNow();
+  }
+
+  /**
+   * Returns periodic pool.
+   *
+   * @return PeriodicPool
+   */
+  public PeriodicPool getPool() {
+    return pool;
+  }
+
+  /**
+   * We are done if we have got reports from all nodes or we have
+   * done waiting for the specified time.
+   *
+   * @return true if we are done, false otherwise.
+   */
+  public boolean isDone() {
+    return (nodeCount.get() == nodeProcessed.get()) ||
+        (this.startTime + this.maxWaitTime) > Time.monotonicNow();
+  }
+
+  /**
+   * Gets the number of containers processed.
+   *
+   * @return int
+   */
+  public int getContainerProcessedCount() {
+    return containerProcessedCount.get();
+  }
+
+  /**
+   * Returns the start time in milliseconds.
+   *
+   * @return - Start Time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Get the number of nodes in this pool.
+   *
+   * @return - node count
+   */
+  public int getNodeCount() {
+    return nodeCount.get();
+  }
+
+  /**
+   * Get the number of nodes that we have already processed container reports
+   * from.
+   *
+   * @return - Processed count.
+   */
+  public int getNodeProcessed() {
+    return nodeProcessed.get();
+  }
+
+  /**
+   * Returns the current status.
+   *
+   * @return Status
+   */
+  public ProgressStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * Starts the reconciliation process for all the nodes in the pool.
+   */
+  public void startReconciliation() {
+    List<DatanodeDetails> datanodeDetailsList =
+        this.poolManager.getNodes(pool.getPoolName());
+    if (datanodeDetailsList.size() == 0) {
+      LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
+          pool.getPoolName());
+      this.status = ProgressStatus.Error;
+      return;
+    }
+
+    nodeProcessed = new AtomicInteger(0);
+    containerProcessedCount = new AtomicInteger(0);
+    nodeCount = new AtomicInteger(0);
+    /*
+       Ask each datanode to send us commands.
+     */
+    SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
+    for (DatanodeDetails dd : datanodeDetailsList) {
+      NodeState currentState = getNodestate(dd);
+      if (currentState == HEALTHY || currentState == STALE) {
+        nodeCount.incrementAndGet();
+        // Queue commands to all datanodes in this pool to send us container
+        // report. Since we ignore dead nodes, it is possible that we would 
have
+        // over replicated the container if the node comes back.
+        nodeManager.addDatanodeCommand(dd.getUuid(), cmd);
+      }
+    }
+    this.status = ProgressStatus.InProgress;
+    this.getPool().setLastProcessedTime(Time.monotonicNow());
+  }
+
+  /**
+   * Gets the node state.
+   *
+   * @param datanode - datanode information.
+   * @return NodeState.
+   */
+  private NodeState getNodestate(DatanodeDetails datanode) {
+    NodeState  currentState = INVALID;
+    int maxTry = 100;
+    // We need to loop to make sure that we will retry if we get
+    // node state unknown. This can lead to infinite loop if we send
+    // in unknown node ID. So max try count is used to prevent it.
+
+    int currentTry = 0;
+    while (currentState == INVALID && currentTry < maxTry) {
+      // Retry to make sure that we deal with the case of node state not
+      // known.
+      currentState = nodeManager.getNodeState(datanode);
+      currentTry++;
+      if (currentState == INVALID) {
+        // Sleep to make sure that this is not a tight loop.
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      }
+    }
+    if (currentState == INVALID) {
+      LOG.error("Not able to determine the state of Node: {}, Exceeded max " +
+          "try and node manager returns INVALID state. This indicates we " +
+          "are dealing with a node that we don't know about.", datanode);
+    }
+    return currentState;
+  }
+
+  /**
+   * Queues a container Report for handling. This is done in a worker thread
+   * since decoding a container report might be compute intensive . We don't
+   * want to block since we have asked for bunch of container reports
+   * from a set of datanodes.
+   *
+   * @param containerReport - ContainerReport
+   */
+  public void handleContainerReport(
+      ContainerReportsRequestProto containerReport) {
+    if (status == ProgressStatus.InProgress) {
+      executorService.submit(processContainerReport(containerReport));
+    } else {
+      LOG.debug("Cannot handle container report when the pool is in {} 
status.",
+          status);
+    }
+  }
+
+  private Runnable processContainerReport(
+      ContainerReportsRequestProto reports) {
+    return () -> {
+      DatanodeDetails datanodeDetails =
+          DatanodeDetails.getFromProtoBuf(reports.getDatanodeDetails());
+      if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(),
+          (k) -> true)) {
+        nodeProcessed.incrementAndGet();
+        LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
+            datanodeDetails.getUuid());
+        for (ContainerInfo info : reports.getReportsList()) {
+          containerProcessedCount.incrementAndGet();
+          LOG.debug("Total Containers processed: {} Container Name: {}",
+              containerProcessedCount.get(), info.getContainerName());
+
+          // Update the container map with count + 1 if the key exists or
+          // update the map with 1. Since this is a concurrentMap the
+          // computation and update is atomic.
+          containerCountMap.merge(info.getContainerName(), 1, Integer::sum);
+        }
+      }
+    };
+  }
+
+  /**
+   * Filter the containers based on specific rules.
+   *
+   * @param predicate -- Predicate to filter by
+   * @return A list of map entries.
+   */
+  public List<Map.Entry<String, Integer>> filterContainer(
+      Predicate<Map.Entry<String, Integer>> predicate) {
+    return containerCountMap.entrySet().stream()
+        .filter(predicate).collect(Collectors.toList());
+  }
+
+  /**
+   * Used only for testing, calling this will abort container report
+   * processing. This is very dangerous call and should not be made by any 
users
+   */
+  @VisibleForTesting
+  public void setDoneProcessing() {
+    nodeProcessed.set(nodeCount.get());
+  }
+
+  /**
+   * Returns the pool name.
+   *
+   * @return Name of the pool.
+   */
+  String getPoolName() {
+    return pool.getPoolName();
+  }
+
+  public void finalizeReconciliation() {
+    status = ProgressStatus.Done;
+    //TODO: Add finalizing logic. This is where actual reconciliation happens.
+  }
+
+  /**
+   * Current status of the computing replication status.
+   */
+  public enum ProgressStatus {
+    InProgress, Done, Error
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
new file mode 100644
index 0000000..ef28aa7
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Periodic pool is a pool with a time stamp, this allows us to process pools
+ * based on a cyclic clock.
+ */
+public class PeriodicPool implements Comparable<PeriodicPool> {
+  private final String poolName;
+  private long lastProcessedTime;
+  private AtomicLong totalProcessedCount;
+
+  /**
+   * Constructs a periodic pool.
+   *
+   * @param poolName - Name of the pool
+   */
+  public PeriodicPool(String poolName) {
+    this.poolName = poolName;
+    lastProcessedTime = 0;
+    totalProcessedCount = new AtomicLong(0);
+  }
+
+  /**
+   * Get pool Name.
+   * @return PoolName
+   */
+  public String getPoolName() {
+    return poolName;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(PeriodicPool o) {
+    return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PeriodicPool that = (PeriodicPool) o;
+
+    return poolName.equals(that.poolName);
+  }
+
+  @Override
+  public int hashCode() {
+    return poolName.hashCode();
+  }
+
+  /**
+   * Returns the Total Times we have processed this pool.
+   *
+   * @return processed count.
+   */
+  public long getTotalProcessedCount() {
+    return totalProcessedCount.get();
+  }
+
+  /**
+   * Gets the last time we processed this pool.
+   * @return time in milliseconds
+   */
+  public long getLastProcessedTime() {
+    return this.lastProcessedTime;
+  }
+
+
+  /**
+   * Sets the last processed time.
+   *
+   * @param lastProcessedTime - Long in milliseconds.
+   */
+
+  public void setLastProcessedTime(long lastProcessedTime) {
+    this.lastProcessedTime = lastProcessedTime;
+  }
+
+  /*
+   * Increments the total processed count.
+   */
+  public void incTotalProcessedCount() {
+    this.totalProcessedCount.incrementAndGet();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to