http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7078b8f..42b39f9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -17,17 +17,12 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -45,11 +40,8 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
@@ -116,7 +108,7 @@ import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
  * TimeOut Delete Container State Machine - if the container creating times 
out,
  * then Container State manager decides to delete the container.
  */
-public class ContainerStateManager implements Closeable {
+public class ContainerStateManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerStateManager.class);
 
@@ -135,11 +127,10 @@ public class ContainerStateManager implements Closeable {
    * TODO : Add Container Tags so we know which containers are owned by SCM.
    */
   @SuppressWarnings("unchecked")
-  public ContainerStateManager(Configuration configuration,
-      ContainerManager containerManager, PipelineSelector pipelineSelector) {
+  public ContainerStateManager(final Configuration configuration) {
 
     // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+    final Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
 
     // These are the steady states of a container.
     finalStates.add(LifeCycleState.OPEN);
@@ -155,22 +146,9 @@ public class ContainerStateManager implements Closeable {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
 
-    lastUsedMap = new ConcurrentHashMap<>();
-    containerCount = new AtomicLong(0);
-    containers = new ContainerStateMap();
-  }
-
-  /**
-   * Return the info of all the containers kept by the in-memory mapping.
-   *
-   * @return the list of all container info.
-   */
-  public List<ContainerInfo> getAllContainers() {
-    List<ContainerInfo> list = new ArrayList<>();
-
-    //No Locking needed since the return value is an immutable map.
-    containers.getContainerMap().forEach((key, value) -> list.add(value));
-    return list;
+    this.lastUsedMap = new ConcurrentHashMap<>();
+    this.containerCount = new AtomicLong(0);
+    this.containers = new ContainerStateMap();
   }
 
   /*
@@ -244,17 +222,15 @@ public class ContainerStateManager implements Closeable {
         LifeCycleEvent.CLEANUP);
   }
 
-  public void addExistingContainer(ContainerInfo containerInfo)
+  void loadContainer(final ContainerInfo containerInfo)
       throws SCMException {
     containers.addContainer(containerInfo);
-    long containerID = containerInfo.getContainerID();
-    if (containerCount.get() < containerID) {
-      containerCount.set(containerID);
-    }
+    containerCount.set(Long.max(
+        containerInfo.getContainerID(), containerCount.get()));
   }
 
   /**
-   * allocates a new container based on the type, replication etc.
+   * Allocates a new container based on the type, replication etc.
    *
    * @param selector -- Pipeline selector class.
    * @param type -- Replication type.
@@ -262,25 +238,22 @@ public class ContainerStateManager implements Closeable {
    * @return ContainerWithPipeline
    * @throws IOException  on Failure.
    */
-  public ContainerWithPipeline allocateContainer(PipelineSelector selector,
-      HddsProtos.ReplicationType type,
-      HddsProtos.ReplicationFactor replicationFactor, String owner)
+  ContainerInfo allocateContainer(final PipelineSelector selector,
+      final HddsProtos.ReplicationType type,
+      final HddsProtos.ReplicationFactor replicationFactor, final String owner)
       throws IOException {
 
-    Pipeline pipeline = selector.getReplicationPipeline(type,
+    final Pipeline pipeline = selector.getReplicationPipeline(type,
         replicationFactor);
 
     Preconditions.checkNotNull(pipeline, "Pipeline type=%s/"
         + "replication=%s couldn't be found for the new container. "
         + "Do you have enough nodes?", type, replicationFactor);
 
-    long containerID = containerCount.incrementAndGet();
-    ContainerInfo containerInfo = new ContainerInfo.Builder()
+    final long containerID = containerCount.incrementAndGet();
+    final ContainerInfo containerInfo = new ContainerInfo.Builder()
         .setState(HddsProtos.LifeCycleState.ALLOCATED)
         .setPipelineID(pipeline.getId())
-        // This is bytes allocated for blocks inside container, not the
-        // container size
-        .setAllocatedBytes(0)
         .setUsedBytes(0)
         .setNumberOfKeys(0)
         .setStateEnterTime(Time.monotonicNow())
@@ -294,35 +267,34 @@ public class ContainerStateManager implements Closeable {
     Preconditions.checkNotNull(containerInfo);
     containers.addContainer(containerInfo);
     LOG.trace("New container allocated: {}", containerInfo);
-    return new ContainerWithPipeline(containerInfo, pipeline);
+    return containerInfo;
   }
 
   /**
    * Update the Container State to the next state.
    *
-   * @param info - ContainerInfo
+   * @param containerID - ContainerID
    * @param event - LifeCycle Event
    * @return Updated ContainerInfo.
    * @throws SCMException  on Failure.
    */
-  public ContainerInfo updateContainerState(ContainerInfo
-      info, HddsProtos.LifeCycleEvent event) throws SCMException {
-    LifeCycleState newState;
+  ContainerInfo updateContainerState(final ContainerID containerID,
+      final HddsProtos.LifeCycleEvent event)
+      throws SCMException, ContainerNotFoundException {
+    final ContainerInfo info = containers.getContainerInfo(containerID);
     try {
-      newState = this.stateMachine.getNextState(info.getState(), event);
+      final LifeCycleState newState = stateMachine.getNextState(
+          info.getState(), event);
+      containers.updateState(containerID, info.getState(), newState);
+      return containers.getContainerInfo(containerID);
     } catch (InvalidStateTransitionException ex) {
       String error = String.format("Failed to update container state %s, " +
               "reason: invalid state transition from state: %s upon " +
               "event: %s.",
-          info.getContainerID(), info.getState(), event);
+          containerID, info.getState(), event);
       LOG.error(error);
       throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
     }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    containers.updateState(info, info.getState(), newState);
-    return containers.getContainerInfo(info);
   }
 
   /**
@@ -331,10 +303,10 @@ public class ContainerStateManager implements Closeable {
    * @return  ContainerInfo
    * @throws SCMException - on Error.
    */
-  public ContainerInfo updateContainerInfo(ContainerInfo info)
-      throws SCMException {
+  ContainerInfo updateContainerInfo(final ContainerInfo info)
+      throws ContainerNotFoundException {
     containers.updateContainerInfo(info);
-    return containers.getContainerInfo(info);
+    return containers.getContainerInfo(info.containerID());
   }
 
   /**
@@ -343,11 +315,16 @@ public class ContainerStateManager implements Closeable {
    * @param deleteTransactionMap maps containerId to its new
    *                             deleteTransactionID
    */
-  public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) {
-    for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
-      containers.getContainerMap().get(ContainerID.valueof(entry.getKey()))
-          .updateDeleteTransactionId(entry.getValue());
-    }
+  void updateDeleteTransactionId(
+      final Map<Long, Long> deleteTransactionMap) {
+    deleteTransactionMap.forEach((k, v) -> {
+      try {
+        containers.getContainerInfo(ContainerID.valueof(k))
+            .updateDeleteTransactionId(v);
+      } catch (ContainerNotFoundException e) {
+        LOG.warn("Exception while updating delete transaction id.", e);
+      }
+    });
   }
 
   /**
@@ -360,12 +337,12 @@ public class ContainerStateManager implements Closeable {
    * @param state - State of the Container-- {Open, Allocated etc.}
    * @return ContainerInfo, null if there is no match found.
    */
-  public ContainerInfo getMatchingContainer(final long size,
+  ContainerInfo getMatchingContainer(final long size,
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) {
 
     // Find containers that match the query spec, if no match return null.
-    NavigableSet<ContainerID> matchingSet =
+    final NavigableSet<ContainerID> matchingSet =
         containers.getMatchingContainerIDs(state, owner, factor, type);
     if (matchingSet == null || matchingSet.size() == 0) {
       return null;
@@ -373,11 +350,9 @@ public class ContainerStateManager implements Closeable {
 
     // Get the last used container and find container above the last used
     // container ID.
-    ContainerState key = new ContainerState(owner, type, factor);
-    ContainerID lastID = lastUsedMap.get(key);
-    if (lastID == null) {
-      lastID = matchingSet.first();
-    }
+    final ContainerState key = new ContainerState(owner, type, factor);
+    final ContainerID lastID = lastUsedMap
+        .getOrDefault(key, matchingSet.first());
 
     // There is a small issue here. The first time, we will skip the first
     // container. But in most cases it will not matter.
@@ -401,32 +376,47 @@ public class ContainerStateManager implements Closeable {
       resultSet = matchingSet.headSet(lastID, true);
       selectedContainer = findContainerWithSpace(size, resultSet, owner);
     }
-    // Update the allocated Bytes on this container.
-    if (selectedContainer != null) {
-      selectedContainer.updateAllocatedBytes(size);
-    }
     return selectedContainer;
 
   }
 
-  private ContainerInfo findContainerWithSpace(long size,
-      NavigableSet<ContainerID> searchSet, String owner) {
-    // Get the container with space to meet our request.
-    for (ContainerID id : searchSet) {
-      ContainerInfo containerInfo = containers.getContainerInfo(id);
-      if (containerInfo.getAllocatedBytes() + size <= this.containerSize) {
-        containerInfo.updateLastUsedTime();
-
-        ContainerState key = new ContainerState(owner,
-            containerInfo.getReplicationType(),
-            containerInfo.getReplicationFactor());
-        lastUsedMap.put(key, containerInfo.containerID());
-        return containerInfo;
+  private ContainerInfo findContainerWithSpace(final long size,
+      final NavigableSet<ContainerID> searchSet, final String owner) {
+    try {
+      // Get the container with space to meet our request.
+      for (ContainerID id : searchSet) {
+        final ContainerInfo containerInfo = containers.getContainerInfo(id);
+        if (containerInfo.getUsedBytes() + size <= this.containerSize) {
+          containerInfo.updateLastUsedTime();
+
+          final ContainerState key = new ContainerState(owner,
+              containerInfo.getReplicationType(),
+              containerInfo.getReplicationFactor());
+          lastUsedMap.put(key, containerInfo.containerID());
+          return containerInfo;
+        }
       }
+    } catch (ContainerNotFoundException e) {
+      // This should not happen!
+      LOG.warn("Exception while finding container with space", e);
     }
     return null;
   }
 
+  Set<ContainerID> getAllContainerIDs() {
+    return containers.getAllContainerIDs();
+  }
+
+  /**
+   * Returns Containers by State.
+   *
+   * @param state - State - Open, Closed etc.
+   * @return List of containers by state.
+   */
+  Set<ContainerID> getContainerIDsByState(final LifeCycleState state) {
+    return containers.getContainerIDsByState(state);
+  }
+
   /**
    * Returns a set of ContainerIDs that match the Container.
    *
@@ -436,39 +426,25 @@ public class ContainerStateManager implements Closeable {
    * @param state - Current State, like Open, Close etc.
    * @return Set of containers that match the specific query parameters.
    */
-  public NavigableSet<ContainerID> getMatchingContainerIDs(
-      String owner, ReplicationType type, ReplicationFactor factor,
-      LifeCycleState state) {
+  NavigableSet<ContainerID> getMatchingContainerIDs(final String owner,
+      final ReplicationType type, final ReplicationFactor factor,
+      final LifeCycleState state) {
     return containers.getMatchingContainerIDs(state, owner,
         factor, type);
   }
 
   /**
-   * Returns the containerInfo with pipeline for the given container id.
-   * @param selector -- Pipeline selector class.
-   * @param containerID id of the container
-   * @return ContainerInfo containerInfo
-   * @throws IOException
-   */
-  public ContainerWithPipeline getContainer(PipelineSelector selector,
-      ContainerID containerID) {
-    ContainerInfo info = containers.getContainerInfo(containerID.getId());
-    Pipeline pipeline = selector.getPipeline(info.getPipelineID());
-    return new ContainerWithPipeline(info, pipeline);
-  }
-
-  /**
    * Returns the containerInfo for the given container id.
    * @param containerID id of the container
    * @return ContainerInfo containerInfo
    * @throws IOException
    */
-  public ContainerInfo getContainer(ContainerID containerID) {
+  ContainerInfo getContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
     return containers.getContainerInfo(containerID);
   }
 
-  @Override
-  public void close() throws IOException {
+  void close() throws IOException {
   }
 
   /**
@@ -478,8 +454,8 @@ public class ContainerStateManager implements Closeable {
    * @param containerID
    * @return Set<DatanodeDetails>
    */
-  public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
-      throws SCMException {
+  Set<ContainerReplica> getContainerReplicas(
+      final ContainerID containerID) throws ContainerNotFoundException {
     return containers.getContainerReplicas(containerID);
   }
 
@@ -487,53 +463,29 @@ public class ContainerStateManager implements Closeable {
    * Add a container Replica for given DataNode.
    *
    * @param containerID
-   * @param dn
+   * @param replica
    */
-  public void addContainerReplica(ContainerID containerID, DatanodeDetails dn) 
{
-    containers.addContainerReplica(containerID, dn);
+  void updateContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica) throws ContainerNotFoundException {
+    containers.updateContainerReplica(containerID, replica);
   }
 
   /**
    * Remove a container Replica for given DataNode.
    *
    * @param containerID
-   * @param dn
+   * @param replica
    * @return True of dataNode is removed successfully else false.
    */
-  public boolean removeContainerReplica(ContainerID containerID,
-      DatanodeDetails dn) throws SCMException {
-    return containers.removeContainerReplica(containerID, dn);
-  }
-
-  /**
-   * Compare the existing replication number with the expected one.
-   */
-  public ReplicationRequest checkReplicationState(ContainerID containerID)
-      throws SCMException {
-    int existingReplicas = getContainerReplicas(containerID).size();
-    int expectedReplicas = getContainer(containerID)
-        .getReplicationFactor().getNumber();
-    if (existingReplicas != expectedReplicas) {
-      return new ReplicationRequest(containerID.getId(), existingReplicas,
-          expectedReplicas);
-    }
-    return null;
-  }
-
-  /**
-   * Checks if the container is open.
-   */
-  public boolean isOpen(ContainerID containerID) {
-    Preconditions.checkNotNull(containerID);
-    ContainerInfo container = Preconditions
-        .checkNotNull(getContainer(containerID),
-            "Container can't be found " + containerID);
-    return container.isContainerOpen();
+  void removeContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+    containers.removeContainerReplica(containerID, replica);
   }
 
-  @VisibleForTesting
-  public ContainerStateMap getContainerStateMap() {
-    return containers;
+  void removeContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    containers.removeContainer(containerID);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 9d6cadb..96ad731 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -24,12 +24,10 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.SCMContainerInfo;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -55,19 +53,23 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_CONTAINER_SIZE_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .OZONE_SCM_CONTAINER_SIZE;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .FAILED_TO_CHANGE_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
@@ -82,10 +84,7 @@ public class SCMContainerManager implements ContainerManager 
{
   private static final Logger LOG = LoggerFactory.getLogger(SCMContainerManager
       .class);
 
-  private final NodeManager nodeManager;
-  private final long cacheSize;
   private final Lock lock;
-  private final Charset encoding = Charset.forName("UTF-8");
   private final MetadataStore containerStore;
   private final PipelineSelector pipelineSelector;
   private final ContainerStateManager containerStateManager;
@@ -100,113 +99,110 @@ public class SCMContainerManager implements 
ContainerManager {
    * @param nodeManager - NodeManager so that we can get the nodes that are
    * healthy to place new
    * containers.
-   * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
-   * its nodes. This is
    * passed to LevelDB and this memory is allocated in Native code space.
    * CacheSize is specified
    * in MB.
    * @throws IOException on Failure.
    */
   @SuppressWarnings("unchecked")
-  public SCMContainerManager(
-      final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB, EventPublisher eventPublisher) throws IOException {
-    this.nodeManager = nodeManager;
-    this.cacheSize = cacheSizeMB;
-
-    File metaDir = getOzoneMetaDirPath(conf);
-
-    // Write the container name to pipeline mapping.
-    File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
-    containerStore =
-        MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(containerDBPath)
-            .setCacheSize(this.cacheSize * OzoneConsts.MB)
-            .build();
+  public SCMContainerManager(final Configuration conf,
+      final NodeManager nodeManager, final EventPublisher eventPublisher)
+      throws IOException {
 
-    this.lock = new ReentrantLock();
+    final File metaDir = getOzoneMetaDirPath(conf);
+    final File containerDBPath = new File(metaDir, SCM_CONTAINER_DB);
+    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
 
-    size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
-        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+    this.containerStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(containerDBPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
 
+    this.lock = new ReentrantLock();
+    this.size = (long) conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
+        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
     this.pipelineSelector = new PipelineSelector(nodeManager,
-            conf, eventPublisher, cacheSizeMB);
-
-    this.containerStateManager =
-        new ContainerStateManager(conf, this, pipelineSelector);
-    LOG.trace("Container State Manager created.");
-
+            conf, eventPublisher, cacheSize);
+    this.containerStateManager = new ContainerStateManager(conf);
     this.eventPublisher = eventPublisher;
 
-    long containerCreationLeaseTimeout = conf.getTimeDuration(
+    final long containerCreationLeaseTimeout = conf.getTimeDuration(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    containerLeaseManager = new LeaseManager<>("ContainerCreation",
+    this.containerLeaseManager = new LeaseManager<>("ContainerCreation",
         containerCreationLeaseTimeout);
-    containerLeaseManager.start();
+    this.containerLeaseManager.start();
+
     loadExistingContainers();
   }
 
-  private void loadExistingContainers() {
-
-    List<ContainerInfo> containerList;
-    try {
-      containerList = listContainer(0, Integer.MAX_VALUE);
-
-      // if there are no container to load, let us return.
-      if (containerList == null || containerList.size() == 0) {
-        LOG.info("No containers to load for this cluster.");
-        return;
-      }
-    } catch (IOException e) {
-      if (!e.getMessage().equals("No container exists in current db")) {
-        LOG.error("Could not list the containers", e);
-      }
-      return;
+  private void loadExistingContainers() throws IOException {
+    List<Map.Entry<byte[], byte[]>> range = containerStore
+        .getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      ContainerInfo container = ContainerInfo.fromProtobuf(
+          HddsProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
+      Preconditions.checkNotNull(container);
+      containerStateManager.loadContainer(container);
+      pipelineSelector.addContainerToPipeline(
+          container.getPipelineID(), container.getContainerID());
     }
+  }
 
+  @VisibleForTesting
+  // TODO: remove this later.
+  public ContainerStateManager getContainerStateManager() {
+    return containerStateManager;
+  }
+
+  @Override
+  public List<ContainerInfo> getContainers() {
+    lock.lock();
     try {
-      for (ContainerInfo container : containerList) {
-        containerStateManager.addExistingContainer(container);
-        pipelineSelector.addContainerToPipeline(
-            container.getPipelineID(), container.getContainerID());
-      }
-    } catch (SCMException ex) {
-      LOG.error("Unable to create a container information. ", ex);
-      // Fix me, what is the proper shutdown procedure for SCM ??
-      // System.exit(1) // Should we exit here?
+      return containerStateManager.getAllContainerIDs().stream().map(id -> {
+        try {
+          return containerStateManager.getContainer(id);
+        } catch (ContainerNotFoundException e) {
+          // How can this happen?
+          return null;
+        }
+      }).filter(Objects::nonNull).collect(Collectors.toList());
+    } finally {
+      lock.unlock();
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  public ContainerInfo getContainer(final long containerID) throws
-      IOException {
-    ContainerInfo containerInfo;
+  public List<ContainerInfo> getContainers(LifeCycleState state) {
     lock.lock();
     try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      containerInfo = ContainerInfo.fromProtobuf(temp);
-      return containerInfo;
+      return containerStateManager.getContainerIDsByState(state).stream()
+          .map(id -> {
+            try {
+              return containerStateManager.getContainer(id);
+            } catch (ContainerNotFoundException e) {
+              // How can this happen?
+              return null;
+            }
+          }).filter(Objects::nonNull).collect(Collectors.toList());
     } finally {
       lock.unlock();
     }
   }
 
   /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ContainerInfo getContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    return containerStateManager.getContainer(containerID);
+  }
+
+  /**
    * Returns the ContainerInfo and pipeline from the containerID. If container
    * has no available replicas in datanodes it returns pipeline with no
    * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for
@@ -217,38 +213,29 @@ public class SCMContainerManager implements 
ContainerManager {
    * @throws IOException
    */
   @Override
-  public ContainerWithPipeline getContainerWithPipeline(long containerID)
-      throws IOException {
-    ContainerInfo contInfo;
+  public ContainerWithPipeline getContainerWithPipeline(ContainerID 
containerID)
+      throws ContainerNotFoundException {
     lock.lock();
     try {
-      byte[] containerBytes = containerStore.get(
-          Longs.toByteArray(containerID));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified key does not exist. key : " + containerID,
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
-          .parseFrom(containerBytes);
-      contInfo = ContainerInfo.fromProtobuf(temp);
-
+      final ContainerInfo contInfo = getContainer(containerID);
       Pipeline pipeline;
       String leaderId = "";
-      if (contInfo.isContainerOpen()) {
+      if (contInfo.isOpen()) {
         // If pipeline with given pipeline Id already exist return it
         pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
       } else {
         // For close containers create pipeline from datanodes with replicas
-        Set<DatanodeDetails> dnWithReplicas = containerStateManager
+        Set<ContainerReplica> dnWithReplicas = containerStateManager
             .getContainerReplicas(contInfo.containerID());
         if (!dnWithReplicas.isEmpty()) {
-          leaderId = dnWithReplicas.iterator().next().getUuidString();
+          leaderId = dnWithReplicas.iterator().next()
+              .getDatanodeDetails().getUuidString();
         }
         pipeline = new Pipeline(leaderId, contInfo.getState(),
             ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(),
             PipelineID.randomId());
-        dnWithReplicas.forEach(pipeline::addMember);
+        dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails).
+            forEach(pipeline::addMember);
       }
       return new ContainerWithPipeline(contInfo, pipeline);
     } finally {
@@ -260,33 +247,32 @@ public class SCMContainerManager implements 
ContainerManager {
    * {@inheritDoc}
    */
   @Override
-  public List<ContainerInfo> listContainer(long startContainerID,
-      int count) throws IOException {
-    List<ContainerInfo> containerList = new ArrayList<>();
+  public List<ContainerInfo> listContainer(ContainerID startContainerID,
+      int count) {
     lock.lock();
     try {
-      if (containerStore.isEmpty()) {
-        throw new IOException("No container exists in current db");
-      }
-      byte[] startKey = startContainerID <= 0 ? null :
-          Longs.toByteArray(startContainerID);
-      List<Map.Entry<byte[], byte[]>> range =
-          containerStore.getSequentialRangeKVs(startKey, count, null);
-
-      // Transform the values into the pipelines.
-      // TODO: filter by container state
-      for (Map.Entry<byte[], byte[]> entry : range) {
-        ContainerInfo containerInfo =
-            ContainerInfo.fromProtobuf(
-                HddsProtos.SCMContainerInfo.PARSER.parseFrom(
-                    entry.getValue()));
-        Preconditions.checkNotNull(containerInfo);
-        containerList.add(containerInfo);
-      }
+      final long startId = startContainerID == null ?
+          0 : startContainerID.getId();
+      final List<ContainerID> containersIds =
+          new ArrayList<>(containerStateManager.getAllContainerIDs());
+      Collections.sort(containersIds);
+
+      return containersIds.stream()
+          .filter(id -> id.getId() > startId)
+          .limit(count)
+          .map(id -> {
+            try {
+              return containerStateManager.getContainer(id);
+            } catch (ContainerNotFoundException ex) {
+              // This can never happen, as we hold lock no one else can remove
+              // the container after we got the container ids.
+              LOG.warn("Container Missing.", ex);
+              return null;
+            }
+          }).collect(Collectors.toList());
     } finally {
       lock.unlock();
     }
-    return containerList;
   }
 
   /**
@@ -298,29 +284,35 @@ public class SCMContainerManager implements 
ContainerManager {
    * @throws IOException - Exception
    */
   @Override
-  public ContainerWithPipeline allocateContainer(
-      ReplicationType type,
-      ReplicationFactor replicationFactor,
-      String owner)
+  public ContainerWithPipeline allocateContainer(final ReplicationType type,
+      final ReplicationFactor replicationFactor, final String owner)
       throws IOException {
-
-    ContainerInfo containerInfo;
-    ContainerWithPipeline containerWithPipeline;
-
     lock.lock();
     try {
-      containerWithPipeline = containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, owner);
-      containerInfo = containerWithPipeline.getContainerInfo();
-
-      byte[] containerIDBytes = Longs.toByteArray(
-          containerInfo.getContainerID());
-      containerStore.put(containerIDBytes, containerInfo.getProtobuf()
-              .toByteArray());
+      final ContainerInfo containerInfo; containerInfo = containerStateManager
+          .allocateContainer(pipelineSelector, type, replicationFactor, owner);
+      final Pipeline pipeline = pipelineSelector.getPipeline(
+          containerInfo.getPipelineID());
+
+      try {
+        final byte[] containerIDBytes = Longs.toByteArray(
+            containerInfo.getContainerID());
+        containerStore.put(containerIDBytes,
+            containerInfo.getProtobuf().toByteArray());
+      } catch (IOException ex) {
+        // If adding to containerStore fails, we should remove the container
+        // from in-memory map.
+        try {
+          containerStateManager.removeContainer(containerInfo.containerID());
+        } catch (ContainerNotFoundException cnfe) {
+          // No need to worry much, everything is going as planned.
+        }
+        throw ex;
+      }
+      return new ContainerWithPipeline(containerInfo, pipeline);
     } finally {
       lock.unlock();
     }
-    return containerWithPipeline;
   }
 
   /**
@@ -332,18 +324,24 @@ public class SCMContainerManager implements 
ContainerManager {
    *                     specified key.
    */
   @Override
-  public void deleteContainer(long containerID) throws IOException {
+  public void deleteContainer(ContainerID containerID) throws IOException {
     lock.lock();
     try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to delete container " + containerID + ", reason : " +
-                "container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      containerStateManager.removeContainer(containerID);
+      final byte[] dbKey = Longs.toByteArray(containerID.getId());
+      final byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes != null) {
+        containerStore.delete(dbKey);
+      } else {
+        // Where did the container go? o_O
+        LOG.warn("Unable to remove the container {} from container store," +
+                " it's missing!", containerID);
       }
-      containerStore.delete(dbKey);
+    } catch (ContainerNotFoundException cnfe) {
+      throw new SCMException(
+          "Failed to delete container " + containerID + ", reason : " +
+              "container doesn't exist.",
+          SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
     } finally {
       lock.unlock();
     }
@@ -354,40 +352,51 @@ public class SCMContainerManager implements 
ContainerManager {
    */
   @Override
   public HddsProtos.LifeCycleState updateContainerState(
-      long containerID, HddsProtos.LifeCycleEvent event) throws
-      IOException {
-    ContainerInfo containerInfo;
+      ContainerID containerID, HddsProtos.LifeCycleEvent event)
+      throws IOException {
+    // Should we return the updated ContainerInfo instead of LifeCycleState?
     lock.lock();
     try {
-      byte[] dbKey = Longs.toByteArray(containerID);
-      byte[] containerBytes = containerStore.get(dbKey);
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Failed to update container state"
-                + containerID
-                + ", reason : container doesn't exist.",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      ContainerInfo updatedContainer =
+          updateContainerStateInternal(containerID, event);
+      if (!updatedContainer.isOpen()) {
+        pipelineSelector.removeContainerFromPipeline(
+            updatedContainer.getPipelineID(), containerID.getId());
       }
-      containerInfo =
-          ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER
-              .parseFrom(containerBytes));
+      final byte[] dbKey = Longs.toByteArray(containerID.getId());
+      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
+      return updatedContainer.getState();
+    } catch (ContainerNotFoundException cnfe) {
+      throw new SCMException(
+          "Failed to update container state"
+              + containerID
+              + ", reason : container doesn't exist.",
+          SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+    } finally {
+      lock.unlock();
+    }
+  }
 
-      Preconditions.checkNotNull(containerInfo);
+  private ContainerInfo updateContainerStateInternal(ContainerID containerID,
+      HddsProtos.LifeCycleEvent event) throws IOException {
+    // Refactor the below code for better clarity.
+    try {
+      final ContainerInfo info =
+          containerStateManager.getContainer(containerID);
       switch (event) {
       case CREATE:
         // Acquire lease on container
         Lease<ContainerInfo> containerLease =
-            containerLeaseManager.acquire(containerInfo);
+            containerLeaseManager.acquire(info);
         // Register callback to be executed in case of timeout
         containerLease.registerCallBack(() -> {
           updateContainerState(containerID,
               HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
+          return null; });
         break;
       case CREATED:
         // Release the lease on container
-        containerLeaseManager.release(containerInfo);
+        containerLeaseManager.release(info);
         break;
       case FINALIZE:
         // TODO: we don't need a lease manager here for closing as the
@@ -412,28 +421,20 @@ public class SCMContainerManager implements 
ContainerManager {
       // If the below updateContainerState call fails, we should revert the
       // changes made in switch case.
       // Like releasing the lease in case of BEGIN_CREATE.
-      ContainerInfo updatedContainer = containerStateManager
-          .updateContainerState(containerInfo, event);
-      if (!updatedContainer.isContainerOpen()) {
-        pipelineSelector.removeContainerFromPipeline(
-                containerInfo.getPipelineID(), containerID);
-      }
-      containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
-      return updatedContainer.getState();
+      return containerStateManager.updateContainerState(containerID, event);
     } catch (LeaseException e) {
       throw new IOException("Lease Exception.", e);
-    } finally {
-      lock.unlock();
     }
   }
 
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
+
+    /**
+     * Update deleteTransactionId according to deleteTransactionMap.
+     *
+     * @param deleteTransactionMap Maps the containerId to latest delete
+     *                             transaction id for the container.
+     * @throws IOException
+     */
   public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
       throws IOException {
     if (deleteTransactionMap == null) {
@@ -467,16 +468,6 @@ public class SCMContainerManager implements 
ContainerManager {
   }
 
   /**
-   * Returns the container State Manager.
-   *
-   * @return ContainerStateManager
-   */
-  @Override
-  public ContainerStateManager getStateManager() {
-    return containerStateManager;
-  }
-
-  /**
    * Return a container matching the attributes specified.
    *
    * @param sizeRequired - Space needed in the Container.
@@ -489,7 +480,7 @@ public class SCMContainerManager implements 
ContainerManager {
   public ContainerWithPipeline getMatchingContainerWithPipeline(
       final long sizeRequired, String owner, ReplicationType type,
       ReplicationFactor factor, LifeCycleState state) throws IOException {
-    ContainerInfo containerInfo = getStateManager()
+    ContainerInfo containerInfo = containerStateManager
         .getMatchingContainer(sizeRequired, owner, type, factor, state);
     if (containerInfo == null) {
       return null;
@@ -518,70 +509,45 @@ public class SCMContainerManager implements 
ContainerManager {
    */
   @Override
   public void processContainerReports(DatanodeDetails datanodeDetails,
-      ContainerReportsProto reports, boolean isRegisterCall)
-      throws IOException {
+      ContainerReportsProto reports) throws IOException {
     List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
         containerInfos = reports.getReportsList();
     PendingDeleteStatusList pendingDeleteStatusList =
         new PendingDeleteStatusList(datanodeDetails);
-    for (StorageContainerDatanodeProtocolProtos.ContainerInfo contInfo :
+    for (StorageContainerDatanodeProtocolProtos.ContainerInfo newInfo :
         containerInfos) {
-      // Update replica info during registration process.
-      if (isRegisterCall) {
-        try {
-          getStateManager().addContainerReplica(ContainerID.
-              valueof(contInfo.getContainerID()), datanodeDetails);
-        } catch (Exception ex) {
-          // Continue to next one after logging the error.
-          LOG.error("Error while adding replica for containerId {}.",
-              contInfo.getContainerID(), ex);
-        }
-      }
-      byte[] dbKey = Longs.toByteArray(contInfo.getContainerID());
+      ContainerID id = ContainerID.valueof(newInfo.getContainerID());
+      ContainerReplica replica = ContainerReplica.newBuilder()
+          .setContainerID(id)
+          .setDatanodeDetails(datanodeDetails)
+          .setOriginNodeId(datanodeDetails.getUuid())
+          .build();
       lock.lock();
       try {
-        byte[] containerBytes = containerStore.get(dbKey);
-        if (containerBytes != null) {
-          HddsProtos.SCMContainerInfo knownState =
-              HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes);
-
-          if (knownState.getState() == LifeCycleState.CLOSING
-              && contInfo.getState() == LifeCycleState.CLOSED) {
-
-            updateContainerState(contInfo.getContainerID(),
-                LifeCycleEvent.CLOSE);
-
-            //reread the container
-            knownState =
-                HddsProtos.SCMContainerInfo.PARSER
-                    .parseFrom(containerStore.get(dbKey));
-          }
-
-          HddsProtos.SCMContainerInfo newState =
-              reconcileState(contInfo, knownState, datanodeDetails);
-
-          if (knownState.getDeleteTransactionId() > contInfo
-              .getDeleteTransactionId()) {
-            pendingDeleteStatusList
-                .addPendingDeleteStatus(contInfo.getDeleteTransactionId(),
-                    knownState.getDeleteTransactionId(),
-                    knownState.getContainerID());
-          }
-
-          // FIX ME: This can be optimized, we write twice to memory, where a
-          // single write would work well.
-          //
-          // We need to write this to DB again since the closed only write
-          // the updated State.
-          containerStore.put(dbKey, newState.toByteArray());
-
-        } else {
-          // Container not found in our container db.
-          LOG.error("Error while processing container report from datanode :" +
-                  " {}, for container: {}, reason: container doesn't exist in" 
+
-                  "container database.", datanodeDetails,
-              contInfo.getContainerID());
+        containerStateManager.updateContainerReplica(id, replica);
+        ContainerInfo currentInfo = containerStateManager.getContainer(id);
+        if (newInfo.getState() == LifeCycleState.CLOSING
+            && currentInfo.getState() == LifeCycleState.CLOSED) {
+          currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
         }
+
+        HddsProtos.SCMContainerInfo newState =
+            reconcileState(newInfo, currentInfo);
+
+        if (currentInfo.getDeleteTransactionId() >
+            newInfo.getDeleteTransactionId()) {
+          pendingDeleteStatusList
+                .addPendingDeleteStatus(newInfo.getDeleteTransactionId(),
+                    currentInfo.getDeleteTransactionId(),
+                    currentInfo.getContainerID());
+        }
+        containerStateManager.updateContainerInfo(
+            ContainerInfo.fromProtobuf(newState));
+        containerStore.put(id.getBytes(), newState.toByteArray());
+      } catch (ContainerNotFoundException e) {
+        LOG.error("Error while processing container report from datanode :" +
+                " {}, for container: {}, reason: container doesn't exist in" +
+                "container database.", datanodeDetails, id);
       } finally {
         lock.unlock();
       }
@@ -598,36 +564,21 @@ public class SCMContainerManager implements 
ContainerManager {
    *
    * @param datanodeState - State from the Datanode.
    * @param knownState - State inside SCM.
-   * @param dnDetails
    * @return new SCM State for this container.
    */
   private HddsProtos.SCMContainerInfo reconcileState(
       StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState,
-      SCMContainerInfo knownState, DatanodeDetails dnDetails) {
+      ContainerInfo knownState) {
     HddsProtos.SCMContainerInfo.Builder builder =
         HddsProtos.SCMContainerInfo.newBuilder();
     builder.setContainerID(knownState.getContainerID())
-        .setPipelineID(knownState.getPipelineID())
+        .setPipelineID(knownState.getPipelineID().getProtobuf())
+        .setState(knownState.getState())
         .setReplicationType(knownState.getReplicationType())
-        .setReplicationFactor(knownState.getReplicationFactor());
-
-    // TODO: If current state doesn't have this DN in list of DataNodes with
-    // replica then add it in list of replicas.
-
-    // If used size is greater than allocated size, we will be updating
-    // allocated size with used size. This update is done as a fallback
-    // mechanism in case SCM crashes without properly updating allocated
-    // size. Correct allocated value will be updated by
-    // ContainerStateManager during SCM shutdown.
-    long usedSize = datanodeState.getUsed();
-    long allocated = knownState.getAllocatedBytes() > usedSize ?
-        knownState.getAllocatedBytes() : usedSize;
-    builder.setAllocatedBytes(allocated)
-        .setUsedBytes(usedSize)
+        .setReplicationFactor(knownState.getReplicationFactor())
+        .setUsedBytes(datanodeState.getUsed())
         .setNumberOfKeys(datanodeState.getKeyCount())
-        .setState(knownState.getState())
         .setStateEnterTime(knownState.getStateEnterTime())
-        .setContainerID(knownState.getContainerID())
         .setDeleteTransactionId(knownState.getDeleteTransactionId());
     if (knownState.getOwner() != null) {
       builder.setOwner(knownState.getOwner());
@@ -635,20 +586,40 @@ public class SCMContainerManager implements 
ContainerManager {
     return builder.build();
   }
 
+  /**
+   * Returns the latest list of DataNodes where replica for given containerId
+   * exist. Throws an SCMException if no entry is found for given containerId.
+   *
+   * @param containerID
+   * @return Set<DatanodeDetails>
+   */
+  public Set<ContainerReplica> getContainerReplicas(
+      final ContainerID containerID) throws ContainerNotFoundException {
+    return containerStateManager.getContainerReplicas(containerID);
+  }
 
   /**
-   * In Container is in closed state, if it is in closed, Deleting or Deleted
-   * State.
+   * Add a container Replica for given DataNode.
    *
-   * @param info - ContainerInfo.
-   * @return true if is in open state, false otherwise
+   * @param containerID
+   * @param replica
    */
-  private boolean shouldClose(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.OPEN;
+  public void updateContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica) throws ContainerNotFoundException {
+    containerStateManager.updateContainerReplica(containerID, replica);
   }
 
-  private boolean isClosed(ContainerInfo info) {
-    return info.getState() == HddsProtos.LifeCycleState.CLOSED;
+  /**
+   * Remove a container Replica for given DataNode.
+   *
+   * @param containerID
+   * @param replica
+   * @return True of dataNode is removed successfully else false.
+   */
+  public void removeContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+    containerStateManager.removeContainerReplica(containerID, replica);
   }
 
   /**
@@ -671,7 +642,6 @@ public class SCMContainerManager implements 
ContainerManager {
       containerLeaseManager.shutdown();
     }
     if (containerStateManager != null) {
-      flushContainerInfo();
       containerStateManager.close();
     }
     if (containerStore != null) {
@@ -683,48 +653,6 @@ public class SCMContainerManager implements 
ContainerManager {
     }
   }
 
-  /**
-   * Since allocatedBytes of a container is only in memory, stored in
-   * containerStateManager, when closing SCMContainerManager, we need to update
-   * this in the container store.
-   *
-   * @throws IOException on failure.
-   */
-  @VisibleForTesting
-  public void flushContainerInfo() throws IOException {
-    List<ContainerInfo> containers = containerStateManager.getAllContainers();
-    List<Long> failedContainers = new ArrayList<>();
-    for (ContainerInfo info : containers) {
-      // even if some container updated failed, others can still proceed
-      try {
-        byte[] dbKey = Longs.toByteArray(info.getContainerID());
-        byte[] containerBytes = containerStore.get(dbKey);
-        // TODO : looks like when a container is deleted, the container is
-        // removed from containerStore but not containerStateManager, so it can
-        // return info of a deleted container. may revisit this in the future,
-        // for now, just skip a not-found container
-        if (containerBytes != null) {
-          containerStore.put(dbKey, info.getProtobuf().toByteArray());
-        } else {
-          LOG.debug("Container state manager has container {} but not found " +
-                  "in container store, a deleted container?",
-              info.getContainerID());
-        }
-      } catch (IOException ioe) {
-        failedContainers.add(info.getContainerID());
-      }
-    }
-    if (!failedContainers.isEmpty()) {
-      throw new IOException("Error in flushing container info from container " 
+
-          "state manager: " + failedContainers);
-    }
-  }
-
-  @VisibleForTesting
-  public MetadataStore getContainerStore() {
-    return containerStore;
-  }
-
   public PipelineSelector getPipelineSelector() {
     return pipelineSelector;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index ddecdbc..8c11e84 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -22,12 +22,14 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ThreadFactory;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -64,14 +66,14 @@ public class ReplicationManager implements Runnable {
 
   private boolean running = true;
 
-  private ContainerStateManager containerStateManager;
+  private ContainerManager containerManager;
 
   public ReplicationManager(ContainerPlacementPolicy containerPlacement,
-      ContainerStateManager containerStateManager, EventQueue eventQueue,
+      ContainerManager containerManager, EventQueue eventQueue,
       LeaseManager<Long> commandWatcherLeaseManager) {
 
     this.containerPlacement = containerPlacement;
-    this.containerStateManager = containerStateManager;
+    this.containerManager = containerManager;
     this.eventPublisher = eventQueue;
 
     this.replicationCommandWatcher =
@@ -106,7 +108,7 @@ public class ReplicationManager implements Runnable {
 
         ContainerID containerID = new ContainerID(request.getContainerId());
         ContainerInfo containerInfo =
-            containerStateManager.getContainer(containerID);
+            containerManager.getContainer(containerID);
 
         Preconditions.checkNotNull(containerInfo,
             "No information about the container " + request.getContainerId());
@@ -116,10 +118,10 @@ public class ReplicationManager implements Runnable {
                 "Container should be in closed state");
 
         //check the current replication
-        List<DatanodeDetails> datanodesWithReplicas =
+        List<ContainerReplica> containerReplicas =
             new ArrayList<>(getCurrentReplicas(request));
 
-        if (datanodesWithReplicas.size() == 0) {
+        if (containerReplicas.size() == 0) {
           LOG.warn(
               "Container {} should be replicated but can't find any existing "
                   + "replicas",
@@ -134,21 +136,23 @@ public class ReplicationManager implements Runnable {
             .size();
 
         int deficit =
-            request.getExpecReplicationCount() - datanodesWithReplicas.size()
+            request.getExpecReplicationCount() - containerReplicas.size()
                 - inFlightReplications;
 
         if (deficit > 0) {
 
+          List<DatanodeDetails> datanodes = containerReplicas.stream()
+              .map(ContainerReplica::getDatanodeDetails)
+              .collect(Collectors.toList());
           List<DatanodeDetails> selectedDatanodes = containerPlacement
-              .chooseDatanodes(datanodesWithReplicas, deficit,
+              .chooseDatanodes(datanodes, deficit,
                   containerInfo.getUsedBytes());
 
           //send the command
           for (DatanodeDetails datanode : selectedDatanodes) {
 
             ReplicateContainerCommand replicateCommand =
-                new ReplicateContainerCommand(containerID.getId(),
-                    datanodesWithReplicas);
+                new ReplicateContainerCommand(containerID.getId(), datanodes);
 
             eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
                 new CommandForDatanode<>(
@@ -174,9 +178,9 @@ public class ReplicationManager implements Runnable {
   }
 
   @VisibleForTesting
-  protected Set<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+  protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest 
request)
       throws IOException {
-    return containerStateManager
+    return containerManager
         .getContainerReplicas(new ContainerID(request.getContainerId()));
   }
 
@@ -234,7 +238,11 @@ public class ReplicationManager implements Runnable {
     }
   }
 
-  public static class ReplicationCompleted implements IdentifiableEventPayload 
{
+  /**
+   * Add javadoc.
+   */
+  public static class ReplicationCompleted
+      implements IdentifiableEventPayload {
 
     private final long uuid;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 880a715..b8052a4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -20,19 +20,21 @@ package org.apache.hadoop.hdds.scm.container.states;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -46,8 +48,6 @@ import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .CONTAINER_EXISTS;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .FAILED_TO_CHANGE_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_FIND_CONTAINER;
 
 /**
  * Container State Map acts like a unified map for various attributes that are
@@ -84,16 +84,15 @@ public class ContainerStateMap {
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerStateMap.class);
 
+  private final static NavigableSet<ContainerID> EMPTY_SET  =
+      Collections.unmodifiableNavigableSet(new TreeSet<>());
+
   private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
   private final ContainerAttribute<String> ownerMap;
   private final ContainerAttribute<ReplicationFactor> factorMap;
   private final ContainerAttribute<ReplicationType> typeMap;
-
   private final Map<ContainerID, ContainerInfo> containerMap;
-  // Map to hold replicas of given container.
-  private final Map<ContainerID, Set<DatanodeDetails>> contReplicaMap;
-  private final static NavigableSet<ContainerID> EMPTY_SET  =
-      Collections.unmodifiableNavigableSet(new TreeSet<>());
+  private final Map<ContainerID, Set<ContainerReplica>> replicaMap;
   private final Map<ContainerQueryKey, NavigableSet<ContainerID>> resultCache;
 
   // Container State Map lock should be held before calling into
@@ -105,18 +104,14 @@ public class ContainerStateMap {
    * Create a ContainerStateMap.
    */
   public ContainerStateMap() {
-    lifeCycleStateMap = new ContainerAttribute<>();
-    ownerMap = new ContainerAttribute<>();
-    factorMap = new ContainerAttribute<>();
-    typeMap = new ContainerAttribute<>();
-    containerMap = new HashMap<>();
-    lock = new ReentrantReadWriteLock();
-    contReplicaMap = new HashMap<>();
-//        new InstrumentedLock(getClass().getName(), LOG,
-//            new ReentrantLock(),
-//            1000,
-//            300));
-    resultCache = new ConcurrentHashMap<>();
+    this.lifeCycleStateMap = new ContainerAttribute<>();
+    this.ownerMap = new ContainerAttribute<>();
+    this.factorMap = new ContainerAttribute<>();
+    this.typeMap = new ContainerAttribute<>();
+    this.containerMap = new HashMap<>();
+    this.lock = new ReentrantReadWriteLock();
+    this.replicaMap = new HashMap<>();
+    this.resultCache = new ConcurrentHashMap<>();
   }
 
   /**
@@ -125,7 +120,7 @@ public class ContainerStateMap {
    * @param info - container info
    * @throws SCMException - throws if create failed.
    */
-  public void addContainer(ContainerInfo info)
+  public void addContainer(final ContainerInfo info)
       throws SCMException {
     Preconditions.checkNotNull(info, "Container Info cannot be null");
     Preconditions.checkArgument(info.getReplicationFactor().getNumber() > 0,
@@ -133,7 +128,7 @@ public class ContainerStateMap {
 
     lock.writeLock().lock();
     try {
-      ContainerID id = ContainerID.valueof(info.getContainerID());
+      final ContainerID id = info.containerID();
       if (containerMap.putIfAbsent(id, info) != null) {
         LOG.debug("Duplicate container ID detected. {}", id);
         throw new
@@ -145,6 +140,7 @@ public class ContainerStateMap {
       ownerMap.insert(info.getOwner(), id);
       factorMap.insert(info.getReplicationFactor(), id);
       typeMap.insert(info.getReplicationType(), id);
+      replicaMap.put(id, new HashSet<>());
 
       // Flush the cache of this container type, will be added later when
       // get container queries are executed.
@@ -156,23 +152,30 @@ public class ContainerStateMap {
   }
 
   /**
-   * Returns the latest state of Container from SCM's Container State Map.
+   * Removes a Container Entry from ContainerStateMap.
    *
-   * @param info - ContainerInfo
-   * @return ContainerInfo
-   */
-  public ContainerInfo getContainerInfo(ContainerInfo info) {
-    return getContainerInfo(info.getContainerID());
-  }
-
-  /**
-   * Returns the latest state of Container from SCM's Container State Map.
-   *
-   * @param containerID - int
-   * @return container info, if found.
+   * @param containerID - ContainerID
+   * @throws SCMException - throws if create failed.
    */
-  public ContainerInfo getContainerInfo(long containerID) {
-    return getContainerInfo(ContainerID.valueof(containerID));
+  public void removeContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    Preconditions.checkNotNull(containerID, "ContainerID cannot be null");
+    lock.writeLock().lock();
+    try {
+      checkIfContainerExist(containerID);
+      // Should we revert back to the original state if any of the below
+      // remove operation fails?
+      final ContainerInfo info = containerMap.remove(containerID);
+      lifeCycleStateMap.remove(info.getState(), containerID);
+      ownerMap.remove(info.getOwner(), containerID);
+      factorMap.remove(info.getReplicationFactor(), containerID);
+      typeMap.remove(info.getReplicationType(), containerID);
+      // Flush the cache of this container type.
+      flushCache(info);
+      LOG.trace("Removed container with {} successfully.", containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   /**
@@ -181,9 +184,11 @@ public class ContainerStateMap {
    * @param containerID - ContainerID
    * @return container info, if found.
    */
-  public ContainerInfo getContainerInfo(ContainerID containerID) {
+  public ContainerInfo getContainerInfo(final ContainerID containerID)
+      throws ContainerNotFoundException {
     lock.readLock().lock();
     try {
+      checkIfContainerExist(containerID);
       return containerMap.get(containerID);
     } finally {
       lock.readLock().unlock();
@@ -197,21 +202,17 @@ public class ContainerStateMap {
    * @param containerID
    * @return Set<DatanodeDetails>
    */
-  public Set<DatanodeDetails> getContainerReplicas(ContainerID containerID)
-      throws SCMException {
+  public Set<ContainerReplica> getContainerReplicas(
+      final ContainerID containerID) throws ContainerNotFoundException {
     Preconditions.checkNotNull(containerID);
     lock.readLock().lock();
     try {
-      if (contReplicaMap.containsKey(containerID)) {
-        return Collections
-            .unmodifiableSet(contReplicaMap.get(containerID));
-      }
+      checkIfContainerExist(containerID);
+      return Collections
+          .unmodifiableSet(new HashSet<>(replicaMap.get(containerID)));
     } finally {
       lock.readLock().unlock();
     }
-    throw new SCMException(
-        "No entry exist for containerId: " + containerID + " in replica map.",
-        ResultCodes.NO_REPLICA_FOUND);
   }
 
   /**
@@ -220,26 +221,17 @@ public class ContainerStateMap {
    * ContainerId.
    *
    * @param containerID
-   * @param dnList
+   * @param replica
    */
-  public void addContainerReplica(ContainerID containerID,
-      DatanodeDetails... dnList) {
+  public void updateContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica) throws ContainerNotFoundException {
     Preconditions.checkNotNull(containerID);
     lock.writeLock().lock();
     try {
-      for (DatanodeDetails dn : dnList) {
-        Preconditions.checkNotNull(dn);
-        if (contReplicaMap.containsKey(containerID)) {
-          if(!contReplicaMap.get(containerID).add(dn)) {
-            LOG.debug("ReplicaMap already contains entry for container Id: "
-                + "{},DataNode: {}", containerID, dn);
-          }
-        } else {
-          Set<DatanodeDetails> dnSet = new HashSet<>();
-          dnSet.add(dn);
-          contReplicaMap.put(containerID, dnSet);
-        }
-      }
+      checkIfContainerExist(containerID);
+      Set<ContainerReplica> replicas = replicaMap.get(containerID);
+      replicas.remove(replica);
+      replicas.add(replica);
     } finally {
       lock.writeLock().unlock();
     }
@@ -249,61 +241,45 @@ public class ContainerStateMap {
    * Remove a container Replica for given DataNode.
    *
    * @param containerID
-   * @param dn
+   * @param replica
    * @return True of dataNode is removed successfully else false.
    */
-  public boolean removeContainerReplica(ContainerID containerID,
-      DatanodeDetails dn) throws SCMException {
+  public void removeContainerReplica(final ContainerID containerID,
+      final ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
     Preconditions.checkNotNull(containerID);
-    Preconditions.checkNotNull(dn);
+    Preconditions.checkNotNull(replica);
 
     lock.writeLock().lock();
     try {
-      if (contReplicaMap.containsKey(containerID)) {
-        return contReplicaMap.get(containerID).remove(dn);
+      checkIfContainerExist(containerID);
+      if(!replicaMap.get(containerID).remove(replica)) {
+        throw new ContainerReplicaNotFoundException(
+            "Container #"
+                + containerID.getId() + ", replica: " + replica);
       }
     } finally {
       lock.writeLock().unlock();
     }
-    throw new SCMException(
-        "No entry exist for containerId: " + containerID + " in replica map.",
-        ResultCodes.FAILED_TO_FIND_CONTAINER);
   }
 
   @VisibleForTesting
+  // TODO: fix the test case and remove this method!
   public static Logger getLOG() {
     return LOG;
   }
 
   /**
-   * Returns the full container Map.
-   *
-   * @return - Map
-   */
-  public Map<ContainerID, ContainerInfo> getContainerMap() {
-    lock.readLock().lock();
-    try {
-      return Collections.unmodifiableMap(containerMap);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
    * Just update the container State.
    * @param info ContainerInfo.
    */
-  public void updateContainerInfo(ContainerInfo info) throws SCMException {
-    Preconditions.checkNotNull(info);
-    ContainerInfo currentInfo = null;
+  public void updateContainerInfo(final ContainerInfo info)
+      throws ContainerNotFoundException {
     lock.writeLock().lock();
     try {
-      currentInfo = containerMap.get(
-          ContainerID.valueof(info.getContainerID()));
-
-      if (currentInfo == null) {
-        throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
-      }
+      Preconditions.checkNotNull(info);
+      checkIfContainerExist(info.containerID());
+      final ContainerInfo currentInfo = containerMap.get(info.containerID());
       flushCache(info, currentInfo);
       containerMap.put(info.containerID(), info);
     } finally {
@@ -314,33 +290,23 @@ public class ContainerStateMap {
   /**
    * Update the State of a container.
    *
-   * @param info - ContainerInfo
+   * @param containerID - ContainerID
    * @param currentState - CurrentState
    * @param newState - NewState.
    * @throws SCMException - in case of failure.
    */
-  public void updateState(ContainerInfo info, LifeCycleState currentState,
-      LifeCycleState newState) throws SCMException {
+  public void updateState(ContainerID containerID, LifeCycleState currentState,
+      LifeCycleState newState) throws SCMException, ContainerNotFoundException 
{
     Preconditions.checkNotNull(currentState);
     Preconditions.checkNotNull(newState);
-
-    ContainerID id = new ContainerID(info.getContainerID());
-    ContainerInfo currentInfo = null;
-
     lock.writeLock().lock();
     try {
+      checkIfContainerExist(containerID);
+      final ContainerInfo currentInfo = containerMap.get(containerID);
       try {
-        // Just flush both old and new data sets from the result cache.
-        ContainerInfo newInfo = new ContainerInfo(info);
+        final ContainerInfo newInfo = new ContainerInfo(currentInfo);
         newInfo.setState(newState);
-        flushCache(newInfo, info);
-
-        currentInfo = containerMap.get(id);
 
-        if (currentInfo == null) {
-          throw new
-              SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
-        }
         // We are updating two places before this update is done, these can
         // fail independently, since the code needs to handle it.
 
@@ -351,11 +317,13 @@ public class ContainerStateMap {
         // roll back the earlier change we did. If the rollback fails, we can
         // be in an inconsistent state,
 
-        info.setState(newState);
-        containerMap.put(id, info);
-        lifeCycleStateMap.update(currentState, newState, id);
+        containerMap.put(containerID, newInfo);
+        lifeCycleStateMap.update(currentState, newState, containerID);
         LOG.trace("Updated the container {} to new state. Old = {}, new = " +
-            "{}", id, currentState, newState);
+            "{}", containerID, currentState, newState);
+
+        // Just flush both old and new data sets from the result cache.
+        flushCache(currentInfo, newInfo);
       } catch (SCMException ex) {
         LOG.error("Unable to update the container state. {}", ex);
         // we need to revert the change in this attribute since we are not
@@ -364,13 +332,13 @@ public class ContainerStateMap {
                 "old state. Old = {}, Attempted state = {}", currentState,
             newState);
 
-        containerMap.put(id, currentInfo);
+        containerMap.put(containerID, currentInfo);
 
         // if this line throws, the state map can be in an inconsistent
         // state, since we will have modified the attribute by the
         // container state will not in sync since we were not able to put
         // that into the hash table.
-        lifeCycleStateMap.update(newState, currentState, id);
+        lifeCycleStateMap.update(newState, currentState, containerID);
 
         throw new SCMException("Updating the container map failed.", ex,
             FAILED_TO_CHANGE_CONTAINER_STATE);
@@ -380,13 +348,17 @@ public class ContainerStateMap {
     }
   }
 
+  public Set<ContainerID> getAllContainerIDs() {
+    return containerMap.keySet();
+  }
+
   /**
    * Returns A list of containers owned by a name service.
    *
    * @param ownerName - Name of the NameService.
    * @return - NavigableSet of ContainerIDs.
    */
-  NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
+  NavigableSet<ContainerID> getContainerIDsByOwner(final String ownerName) {
     Preconditions.checkNotNull(ownerName);
     lock.readLock().lock();
     try {
@@ -402,7 +374,7 @@ public class ContainerStateMap {
    * @param type - Replication type -- StandAlone, Ratis etc.
    * @return NavigableSet
    */
-  NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
+  NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) {
     Preconditions.checkNotNull(type);
     lock.readLock().lock();
     try {
@@ -418,7 +390,8 @@ public class ContainerStateMap {
    * @param factor - Replication Factor.
    * @return NavigableSet.
    */
-  NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
+  NavigableSet<ContainerID> getContainerIDsByFactor(
+      final ReplicationFactor factor) {
     Preconditions.checkNotNull(factor);
     lock.readLock().lock();
     try {
@@ -435,7 +408,7 @@ public class ContainerStateMap {
    * @return List of containers by state.
    */
   public NavigableSet<ContainerID> getContainerIDsByState(
-      LifeCycleState state) {
+      final LifeCycleState state) {
     Preconditions.checkNotNull(state);
     lock.readLock().lock();
     try {
@@ -455,8 +428,8 @@ public class ContainerStateMap {
    * @return ContainerInfo or Null if not container satisfies the criteria.
    */
   public NavigableSet<ContainerID> getMatchingContainerIDs(
-      LifeCycleState state, String owner,
-      ReplicationFactor factor, ReplicationType type) {
+      final LifeCycleState state, final String owner,
+      final ReplicationFactor factor, final ReplicationType type) {
 
     Preconditions.checkNotNull(state, "State cannot be null");
     Preconditions.checkNotNull(owner, "Owner cannot be null");
@@ -465,7 +438,7 @@ public class ContainerStateMap {
 
     lock.readLock().lock();
     try {
-      ContainerQueryKey queryKey =
+      final ContainerQueryKey queryKey =
           new ContainerQueryKey(state, owner, factor, type);
       if(resultCache.containsKey(queryKey)){
         return resultCache.get(queryKey);
@@ -474,30 +447,33 @@ public class ContainerStateMap {
       // If we cannot meet any one condition we return EMPTY_SET immediately.
       // Since when we intersect these sets, the result will be empty if any
       // one is empty.
-      NavigableSet<ContainerID> stateSet =
+      final NavigableSet<ContainerID> stateSet =
           lifeCycleStateMap.getCollection(state);
       if (stateSet.size() == 0) {
         return EMPTY_SET;
       }
 
-      NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
+      final NavigableSet<ContainerID> ownerSet =
+          ownerMap.getCollection(owner);
       if (ownerSet.size() == 0) {
         return EMPTY_SET;
       }
 
-      NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
+      final NavigableSet<ContainerID> factorSet =
+          factorMap.getCollection(factor);
       if (factorSet.size() == 0) {
         return EMPTY_SET;
       }
 
-      NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
+      final NavigableSet<ContainerID> typeSet =
+          typeMap.getCollection(type);
       if (typeSet.size() == 0) {
         return EMPTY_SET;
       }
 
 
       // if we add more constraints we will just add those sets here..
-      NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+      final NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
           ownerSet, factorSet, typeSet);
 
       NavigableSet<ContainerID> currentSet = sets[0];
@@ -521,12 +497,12 @@ public class ContainerStateMap {
    * @return resultSet which is the intersection of these two sets.
    */
   private NavigableSet<ContainerID> intersectSets(
-      NavigableSet<ContainerID> smaller,
-      NavigableSet<ContainerID> bigger) {
+      final NavigableSet<ContainerID> smaller,
+      final NavigableSet<ContainerID> bigger) {
     Preconditions.checkState(smaller.size() <= bigger.size(),
         "This function assumes the first set is lesser or equal to second " +
             "set");
-    NavigableSet<ContainerID> resultSet = new TreeSet<>();
+    final NavigableSet<ContainerID> resultSet = new TreeSet<>();
     for (ContainerID id : smaller) {
       if (bigger.contains(id)) {
         resultSet.add(id);
@@ -544,11 +520,11 @@ public class ContainerStateMap {
    */
   @SuppressWarnings("unchecked")
   private NavigableSet<ContainerID>[] sortBySize(
-      NavigableSet<ContainerID>... sets) {
+      final NavigableSet<ContainerID>... sets) {
     for (int x = 0; x < sets.length - 1; x++) {
       for (int y = 0; y < sets.length - x - 1; y++) {
         if (sets[y].size() > sets[y + 1].size()) {
-          NavigableSet temp = sets[y];
+          final NavigableSet temp = sets[y];
           sets[y] = sets[y + 1];
           sets[y + 1] = temp;
         }
@@ -557,13 +533,22 @@ public class ContainerStateMap {
     return sets;
   }
 
-  private void flushCache(ContainerInfo... containerInfos) {
+  private void flushCache(final ContainerInfo... containerInfos) {
     for (ContainerInfo containerInfo : containerInfos) {
-      ContainerQueryKey key = new ContainerQueryKey(containerInfo.getState(),
-          containerInfo.getOwner(), containerInfo.getReplicationFactor(),
+      final ContainerQueryKey key = new ContainerQueryKey(
+          containerInfo.getState(),
+          containerInfo.getOwner(),
+          containerInfo.getReplicationFactor(),
           containerInfo.getReplicationType());
       resultCache.remove(key);
     }
   }
 
+  private void checkIfContainerExist(ContainerID containerID)
+      throws ContainerNotFoundException {
+    if (!containerMap.containsKey(containerID)) {
+      throw new ContainerNotFoundException("#" + containerID.getId());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 17edf9e..1030428 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hdds.scm.node;
 import java.util.Set;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerException;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -37,7 +40,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final ContainerStateManager containerStateManager;
+  private final ContainerManager containerManager;
 
   private final NodeManager nodeManager;
 
@@ -45,8 +48,8 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
       LoggerFactory.getLogger(DeadNodeHandler.class);
 
   public DeadNodeHandler(NodeManager nodeManager,
-      ContainerStateManager containerStateManager) {
-    this.containerStateManager = containerStateManager;
+      ContainerManager containerManager) {
+    this.containerManager = containerManager;
     this.nodeManager = nodeManager;
   }
 
@@ -55,45 +58,58 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
       EventPublisher publisher) {
     nodeManager.processDeadNode(datanodeDetails.getUuid());
 
-    Set<ContainerID> containers =
+    // TODO: check if there are any pipeline on this node and fire close
+    // pipeline event
+    Set<ContainerID> ids =
         nodeManager.getContainers(datanodeDetails.getUuid());
-    if (containers == null) {
+    if (ids == null) {
       LOG.info("There's no containers in dead datanode {}, no replica will be"
           + " removed from the in-memory state.", datanodeDetails.getUuid());
       return;
     }
-    LOG.info(
-        "Datanode {}  is dead. Removing replications from the in-memory 
state.",
-        datanodeDetails.getUuid());
-    for (ContainerID container : containers) {
+    LOG.info("Datanode {}  is dead. Removing replications from the in-memory" +
+            " state.", datanodeDetails.getUuid());
+    for (ContainerID id : ids) {
       try {
-        try {
-          containerStateManager.removeContainerReplica(container,
-              datanodeDetails);
-        } catch (SCMException ex) {
-          LOG.info("DataNode {} doesn't have replica for container {}.",
-              datanodeDetails.getUuid(), container.getId());
-        }
-
-        if (!containerStateManager.isOpen(container)) {
-          ReplicationRequest replicationRequest =
-              containerStateManager.checkReplicationState(container);
-
-          if (replicationRequest != null) {
-            publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-                replicationRequest);
+        final ContainerInfo container = containerManager.getContainer(id);
+        if (!container.isOpen()) {
+          final ContainerReplica replica = ContainerReplica.newBuilder()
+              .setContainerID(id)
+              .setDatanodeDetails(datanodeDetails)
+              .build();
+          try {
+            containerManager.removeContainerReplica(id, replica);
+            replicateIfNeeded(container, publisher);
+          } catch (ContainerException ex) {
+            LOG.warn("Exception while removing container replica #{} for " +
+                "container #{}.", replica, container, ex);
           }
         }
-      } catch (SCMException e) {
-        LOG.error("Can't remove container from containerStateMap {}", container
-            .getId(), e);
+      } catch (ContainerNotFoundException cnfe) {
+        LOG.warn("Container Not found!", cnfe);
       }
     }
   }
 
   /**
+   * Compare the existing replication number with the expected one.
+   */
+  private void replicateIfNeeded(ContainerInfo container,
+      EventPublisher publisher) throws ContainerNotFoundException {
+    final int existingReplicas = containerManager
+        .getContainerReplicas(container.containerID()).size();
+    final int expectedReplicas = container.getReplicationFactor().getNumber();
+    if (existingReplicas != expectedReplicas) {
+      publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(
+              container.getContainerID(), existingReplicas, expectedReplicas));
+    }
+  }
+
+  /**
    * Returns logger.
    * */
+  // TODO: remove this.
   public static Logger getLogger() {
     return LOG;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50715c06/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
index 44744f0..7135267 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
     .NodeRegistrationContainerReport;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to